"""
Deterministic execution system for Pynenc workflows.
This module handles deterministic operations in workflows, ensuring that
non-deterministic functions like random numbers, timestamps, and UUIDs
behave deterministically across workflow executions and replays.
"""
from __future__ import annotations
import datetime
import hashlib
import random
import uuid
from typing import TYPE_CHECKING, Any, Callable, TypeVar
from pynenc.arguments import Arguments
from pynenc.call import Call
if TYPE_CHECKING:
from pynenc.app import Pynenc
from pynenc.invocation.dist_invocation import DistributedInvocation
from pynenc.task import Task
from pynenc.workflow.identity import WorkflowIdentity
T = TypeVar("T")
[docs]
class DeterministicExecutor:
"""
Handles deterministic operations for workflow execution.
This class ensures that operations like random number generation,
time functions, and task executions behave deterministically
across workflow replays by using deterministic seeds and storing
results in the state backend.
"""
def __init__(self, workflow_identity: WorkflowIdentity, app: Pynenc) -> None:
"""
Initialize the deterministic execution engine.
:param workflow_identity: The workflow identity
:param app: The Pynenc application instance
"""
self.workflow_identity = workflow_identity
self.app = app
self._operation_counters: dict[str, int] = {}
[docs]
def _get_next_sequence(self, operation: str) -> int:
"""
Get the next sequence number for an operation.
Tracks the current position for this executor instance, starting from 0
and incrementing as operations are performed or replayed.
:param operation: The operation name
:return: The next sequence number
"""
if operation not in self._operation_counters:
self._operation_counters[operation] = 0
# Increment counter for this executor instance
self._operation_counters[operation] += 1
return self._operation_counters[operation]
[docs]
def _deterministic_operation(self, operation: str, generator: Callable[[], T]) -> T:
"""
Execute an operation with deterministic results.
First checks if a value exists for this operation sequence, otherwise
generates a new value using the provided generator and stores it.
:param operation: Operation type (e.g., "random", "time")
:param generator: Function to generate value if not already recorded
:return: Deterministic operation result
"""
# Get next sequence number for this executor instance
sequence = self._get_next_sequence(operation)
operation_key = f"{operation}:{sequence}"
# Check if we have a recorded value first
value = self.app.state_backend.get_workflow_deterministic_value(
self.workflow_identity, operation_key
)
if value is not None:
# Return recorded value (replay mode)
return value
# Generate new value and store it
value = generator()
self.app.state_backend.set_workflow_deterministic_value(
self.workflow_identity, operation_key, value
)
# Update the total count in state backend for this operation type
total_count_key = f"counter:{operation}"
current_total = (
self.app.state_backend.get_workflow_deterministic_value(
self.workflow_identity, total_count_key
)
or 0
)
self.app.state_backend.set_workflow_deterministic_value(
self.workflow_identity, total_count_key, max(current_total, sequence)
)
return value
[docs]
def get_base_time(self) -> datetime.datetime:
"""
Get or establish workflow base time for deterministic timestamps.
:return: Base time for deterministic timestamps
"""
base_time_key = "workflow:base_time"
base_time = self.app.state_backend.get_workflow_deterministic_value(
self.workflow_identity, base_time_key
)
if base_time is None:
base_time = datetime.datetime.now(datetime.timezone.utc)
self.app.state_backend.set_workflow_deterministic_value(
self.workflow_identity, base_time_key, base_time
)
return base_time
[docs]
def get_operation_count(self, operation: str) -> int:
"""
Get current count for an operation type from this executor instance.
This represents how many operations of this type have been processed
by this executor instance, not the total stored count.
:param operation: Operation type name
:return: Current operation count for this executor instance
"""
return self._operation_counters.get(operation, 0)
[docs]
def random(self) -> float:
"""
Generate a deterministic random number using workflow-specific seed.
:return: A random float between 0.0 and 1.0
"""
def random_generator() -> float:
# Create deterministic seed from workflow and current sequence
sequence = self._operation_counters.get("random", 0) + 1
seed_string = f"{self.workflow_identity.workflow_id}:random:{sequence}"
seed = int(hashlib.md5(seed_string.encode()).hexdigest()[:8], 16)
# Use deterministic random generator
temp_random = random.Random(seed)
return temp_random.random()
return self._deterministic_operation("random", random_generator)
[docs]
def utc_now(self) -> datetime.datetime:
"""
Get current time deterministically by advancing from base time.
:return: Deterministic datetime with UTC timezone
"""
def time_generator() -> datetime.datetime:
base_time = self.get_base_time()
# Use current sequence number to advance time deterministically
sequence = self._operation_counters.get("time", 0) + 1
return base_time + datetime.timedelta(seconds=sequence)
return self._deterministic_operation("time", time_generator)
[docs]
def uuid(self) -> str:
"""
Generate a deterministic UUID using workflow-specific seed.
:return: UUID string
"""
def uuid_generator() -> str:
# Create deterministic UUID from workflow and current sequence
sequence = self._operation_counters.get("uuid", 0) + 1
seed_string = f"{self.workflow_identity.workflow_id}:uuid:{sequence}"
hash_bytes = hashlib.md5(seed_string.encode()).digest()
return str(uuid.UUID(bytes=hash_bytes))
return self._deterministic_operation("uuid", uuid_generator)
[docs]
def execute_task(
self, task: Task, *args: Any, **kwargs: Any
) -> DistributedInvocation:
"""
Execute a task with deterministic replay capabilities.
Returns a DistributedInvocation that allows flexible handling of results.
During replay, if the same task with identical arguments was already executed,
it returns the existing invocation from the state backend.
:param task: The task to execute
:param args: Positional arguments
:param kwargs: Keyword arguments
:return: DistributedInvocation for flexible result handling
"""
# Create arguments and call to get the proper call_id
arguments = Arguments.from_call(task.func, *args, **kwargs)
call: Call = Call(task=task, _arguments=arguments)
# Use call_id as the unique key - no need for additional hashing
task_invocation_key = f"task_invocation:{call.call_id}"
# Check if we already have a recorded invocation ID
cached_invocation_id = self.app.state_backend.get_workflow_deterministic_value(
self.workflow_identity, task_invocation_key
)
if cached_invocation_id is not None:
# Return existing invocation from state backend
return self.app.state_backend.get_invocation(cached_invocation_id)
# Execute the task and record the invocation ID
invocation = task(*args, **kwargs)
# Store only the invocation ID for future replays
self.app.state_backend.set_workflow_deterministic_value(
self.workflow_identity, task_invocation_key, invocation.invocation_id
)
return invocation # type: ignore[return-value]