pynenc.workflow.workflow_deterministic¶
Deterministic execution system for Pynenc workflows.
This module handles deterministic operations in workflows, ensuring that non-deterministic functions like random numbers, timestamps, and UUIDs behave deterministically across workflow executions and replays.
Module Contents¶
Classes¶
Handles deterministic operations for workflow execution. |
Data¶
API¶
- pynenc.workflow.workflow_deterministic.T¶
‘TypeVar(…)’
- class pynenc.workflow.workflow_deterministic.DeterministicExecutor(workflow_identity: pynenc.workflow.workflow_identity.WorkflowIdentity, app: pynenc.app.Pynenc)[source]¶
Handles deterministic operations for workflow execution.
This class ensures that operations like random number generation, time functions, and task executions behave deterministically across workflow replays by using deterministic seeds and storing results in the state backend.
Initialization
Initialize the deterministic execution engine.
- Parameters:
workflow_identity – The workflow identity
app – The Pynenc application instance
- _get_next_sequence(operation: str) int[source]¶
Get the next sequence number for an operation.
Tracks the current position for this executor instance, starting from 0 and incrementing as operations are performed or replayed.
- Parameters:
operation – The operation name
- Returns:
The next sequence number
- _deterministic_operation(operation: str, generator: collections.abc.Callable[[], pynenc.workflow.workflow_deterministic.T]) pynenc.workflow.workflow_deterministic.T[source]¶
Execute an operation with deterministic results.
First checks if a value exists for this operation sequence, otherwise generates a new value using the provided generator and stores it.
- Parameters:
operation – Operation type (e.g., “random”, “time”)
generator – Function to generate value if not already recorded
- Returns:
Deterministic operation result
- get_base_time() datetime.datetime[source]¶
Get or establish workflow base time for deterministic timestamps.
- Returns:
Base time for deterministic timestamps
- get_operation_count(operation: str) int[source]¶
Get current count for an operation type from this executor instance.
This represents how many operations of this type have been processed by this executor instance, not the total stored count.
- Parameters:
operation – Operation type name
- Returns:
Current operation count for this executor instance
- random() float[source]¶
Generate a deterministic random number using workflow-specific seed.
- Returns:
A random float between 0.0 and 1.0
- utc_now() datetime.datetime[source]¶
Get current time deterministically by advancing from base time.
- Returns:
Deterministic datetime with UTC timezone
- uuid() str[source]¶
Generate a deterministic UUID using workflow-specific seed.
- Returns:
UUID string
- execute_task(task: pynenc.task.Task, *args: Any, **kwargs: Any) pynenc.invocation.dist_invocation.DistributedInvocation[source]¶
Execute a task with deterministic replay capabilities.
Returns a DistributedInvocation that allows flexible handling of results. During replay, if the same task with identical arguments was already executed, it returns the existing invocation from the state backend.
- Parameters:
task – The task to execute
args – Positional arguments
kwargs – Keyword arguments
- Returns:
DistributedInvocation for flexible result handling