Source code for pynenc.workflow.identity
import json
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from pynenc.invocation.base_invocation import InvocationIdentity
from pynenc.task import Task
[docs]
@dataclass(frozen=True)
class WorkflowIdentity:
"""
Immutable identity of a workflow execution.
Workflows provide a structured execution context for tasks, enabling complex orchestration
and dependency management. Each workflow has a unique identity captured by this class.
:param workflow_task_id: The identifier of the task that defines this workflow
:param workflow_invocation_id: The unique identifier for this specific workflow execution
:param parent_workflow: The parent workflow if this is a subworkflow
"""
workflow_task_id: str
workflow_invocation_id: str
parent_workflow: Optional["WorkflowIdentity"] = None
@property
def workflow_id(self) -> str:
"""Get the unique identifier for this workflow."""
return self.workflow_invocation_id
[docs]
@classmethod
def from_invocation(cls, invocation: "InvocationIdentity") -> "WorkflowIdentity":
"""
Create a new workflow identity based on the invocation.
:param invocation: The invocation that defines this workflow
:param parent_workflow: Optional parent workflow if this is a subworkflow
:return: A new workflow identity
"""
task: Task = invocation.call.task
parent_workflow = (
invocation.parent_invocation.workflow
if invocation.parent_invocation
else None
)
if not task.conf.force_new_workflow and parent_workflow is not None:
task.app.logger.debug(f"New {invocation=} on {parent_workflow}")
return parent_workflow
new_wf = cls(
workflow_task_id=task.task_id,
workflow_invocation_id=invocation.invocation_id,
parent_workflow=parent_workflow,
)
sub = "sub-" if parent_workflow else ""
task.app.logger.info(f"Creating a new {sub}workflow {new_wf}")
return new_wf
@property
def is_subworkflow(self) -> bool:
"""Check if this workflow is a subworkflow."""
return self.parent_workflow is not None
[docs]
def to_json(self) -> str:
"""Convert the workflow identity to a JSON-serializable dictionary."""
parent_workflow_json = (
self.parent_workflow.to_json() if self.parent_workflow else None
)
return json.dumps(
{
"workflow_task_id": self.workflow_task_id,
"workflow_invocation_id": self.workflow_invocation_id,
"parent_workflow": parent_workflow_json,
}
)
[docs]
@classmethod
def from_json(cls, json_str: str) -> "WorkflowIdentity":
"""Create a WorkflowIdentity from a JSON string."""
data = json.loads(json_str)
parent_workflow = (
cls.from_json(data["parent_workflow"]) if data["parent_workflow"] else None
)
return cls(
workflow_task_id=data["workflow_task_id"],
workflow_invocation_id=data["workflow_invocation_id"],
parent_workflow=parent_workflow,
)