Source code for pynenc.workflow.workflow_context
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING, Any, TypeVar
from pynenc.workflow.workflow_deterministic import DeterministicExecutor
if TYPE_CHECKING:
from pynenc.app import Pynenc
from pynenc.task import Task
from pynenc.workflow.workflow_identity import WorkflowIdentity
S = TypeVar("S")
[docs]
class WorkflowContext:
"""
Provides workflow capabilities for tasks within Pynenc.
This class exposes methods for workflow state management, determinism, and
durability. It's accessible via the `wf` property on any Pynenc task.
:param task: The task this helper is attached to
"""
def __init__(self, task: Task):
"""Initialize the workflow helper with its associated task."""
self.task = task
self._deterministic: DeterministicExecutor | None = None
@property
def deterministic(self) -> DeterministicExecutor:
"""Get the deterministic executor for this workflow context."""
if self._deterministic is None:
self._deterministic = DeterministicExecutor(
self.task.invocation.workflow, self.task.app
)
return self._deterministic
@property
def app(self) -> Pynenc:
"""Get the Pynenc app instance associated with this workflow."""
return self.task.app
@property
def identity(self) -> WorkflowIdentity:
"""Get the workflow identity for the current execution."""
return self.task.invocation.workflow
[docs]
def get_data(self, key: str, default: Any = None) -> Any:
"""
Get a value from the workflow's data store.
Any task within the workflow can read data.
:param key: The data key to retrieve
:param default: Default value if the key doesn't exist
:return: The stored value or default
"""
return self.app.state_backend.get_workflow_data(self.identity, key, default)
[docs]
def set_data(self, key: str, value: Any) -> None:
"""
Set a value in the workflow's data store.
Only the main workflow task can write workflow data.
:param key: The data key to set
:param value: The value to store
:raises WorkflowAccessError: If a non-owner task attempts to modify data
"""
self.app.state_backend.set_workflow_data(self.identity, key, value)
# DETERMINISTIC OPERATION METHODS
[docs]
def random(self) -> float:
"""
Generate a deterministic random number in the workflow context.
Returns the same sequence of "random" numbers on workflow replay.
:return: A random float between 0.0 and 1.0
"""
return self.deterministic.random()
[docs]
def utc_now(self) -> datetime.datetime:
"""
Get the current time deterministically in the workflow context.
Returns deterministic timestamps that advance consistently on workflow replay.
:return: Current datetime with UTC timezone
"""
return self.deterministic.utc_now()
[docs]
def uuid(self) -> str:
"""
Generate a deterministic UUID in the workflow context.
Returns the same sequence of UUIDs on workflow replay.
:return: UUID string
"""
return self.deterministic.uuid()
# CHILD WORKFLOW/TASK EXECUTION
[docs]
def execute_task(self, task: Task, *args: Any, **kwargs: Any) -> Any:
"""
Execute a task as part of this workflow with deterministic replay.
On workflow replay, returns the recorded result without re-execution.
:param task: The task to execute
:param args: Positional arguments for the task
:param kwargs: Keyword arguments for the task
:return: Task result
"""
return self.deterministic.execute_task(task, *args, **kwargs)