Workflow System¶
Overview¶
The Pynenc Workflow System enables sophisticated task orchestration with deterministic execution, state management, and automatic replay capabilities. It provides a foundation for building complex, multi-step workflows that can recover from failures and maintain consistency across distributed environments.
Philosophy and Design¶
Core Principles¶
Deterministic Execution: All non-deterministic operations (random numbers, timestamps, UUIDs) are made deterministic through seeded generation and state storage, enabling perfect replay of workflow execution.
Workflow Identity: Each workflow has a unique identity that tracks its lineage and relationships, allowing for hierarchical workflow structures and context inheritance.
State Persistence: Workflow state is automatically persisted, enabling recovery from failures and resumption from exact points of interruption.
Task Integration: Workflows seamlessly integrate with Pynenc’s existing task system, allowing any task to participate in workflow execution.
Design Concepts¶
Workflow Context: Tasks executed within a workflow automatically inherit the workflow context, enabling access to workflow-specific operations and state.
Parent-Child Relationships: Workflows can spawn child workflows, creating hierarchical structures while maintaining clear boundaries and inheritance rules.
Deterministic Operations: Built-in support for deterministic random numbers, timestamps, UUIDs, and task execution that replay identically.
Automatic State Management: The system automatically manages workflow state without requiring explicit state handling in user code.
Scenario¶
Consider an e-commerce order processing system that needs to:
Process payment with retry logic
Generate unique tracking numbers
Schedule delivery notifications
Handle complex business logic with multiple decision points
Maintain audit trails and enable recovery from failures
The workflow system ensures that even if the system crashes mid-processing, the workflow can resume from exactly where it left off, with all random numbers, timestamps, and business decisions replaying identically.
Implementation¶
Basic Workflow Usage¶
from pynenc import Pynenc
from datetime import timedelta
from typing import Any
app = Pynenc()
@app.task(force_new_workflow=True)
def process_order_workflow(order_id: str) -> dict[str, Any]:
"""
Main order processing workflow with explicit workflow boundary.
Using force_new_workflow=True ensures this task always creates a new workflow
context, providing access to deterministic operations and state management.
"""
# All random operations are deterministic and will replay identically
tracking_number = generate_tracking_number(order_id)
# Process payment with deterministic retry logic
payment_result = process_payment_workflow.wf.execute_task(
process_payment, order_id
)
# Schedule notifications using deterministic timestamps
notification_time = process_order_workflow.wf.utc_now() + timedelta(hours=1)
# Store workflow data for later retrieval
process_order_workflow.wf.set_data("tracking_number", tracking_number)
process_order_workflow.wf.set_data("notification_time", notification_time.isoformat())
return {
"order_id": order_id,
"tracking_number": tracking_number,
"payment_status": payment_result.result,
"notification_scheduled": notification_time.isoformat()
}
def generate_tracking_number(order_id: str) -> str:
"""Generate a deterministic tracking number."""
# This random number will be the same on replay
random_suffix = int(process_order_workflow.wf.random() * 100000)
return f"TRK-{order_id}-{random_suffix:05d}"
@app.task
def process_payment(order_id: str) -> dict[str, Any]:
"""Process payment for an order."""
# Simulate payment processing
payment_id = process_payment.wf.uuid()
processing_time = process_payment.wf.utc_now()
return {
"payment_id": payment_id,
"processed_at": processing_time.isoformat(),
"status": "completed"
}
Creating New Workflow Boundaries¶
@app.task(force_new_workflow=True)
def independent_workflow(data: dict[str, Any]) -> dict[str, Any]:
"""
This task always creates a new workflow, regardless of calling context.
Use force_new_workflow=True when you want to create a clear workflow boundary
and ensure that this task starts its own workflow context.
"""
workflow_id = independent_workflow.workflow.workflow_id
# This workflow has its own deterministic context
unique_value = independent_workflow.wf.random()
timestamp = independent_workflow.wf.utc_now()
return {
"workflow_id": workflow_id,
"unique_value": unique_value,
"timestamp": timestamp.isoformat(),
"data": data
}
@app.task
def call_independent_workflow(input_data: dict[str, Any]) -> dict[str, Any]:
"""Demonstrate calling a task that creates its own workflow."""
# This task runs in its own workflow context
parent_workflow_id = call_independent_workflow.workflow.workflow_id
# Execute a task that forces a new workflow
child_result = call_independent_workflow.wf.execute_task(
independent_workflow, input_data
)
# The child workflow has a different workflow ID
child_workflow_id = child_result.workflow.workflow_id
return {
"parent_workflow_id": parent_workflow_id,
"child_workflow_id": child_workflow_id,
"child_result": child_result.result,
"workflows_are_different": parent_workflow_id != child_workflow_id
}
Deterministic Operations¶
The workflow system provides several deterministic operations that ensure reproducible execution:
@app.task
def deterministic_operations_example() -> dict[str, Any]:
"""Demonstrate deterministic operations in workflows."""
# Deterministic random number (0.0 to 1.0)
random_value = deterministic_operations_example.wf.random()
# Deterministic UUID generation
unique_id = deterministic_operations_example.wf.uuid()
# Deterministic timestamp progression
start_time = deterministic_operations_example.wf.utc_now()
# Each call advances time deterministically
later_time = deterministic_operations_example.wf.utc_now()
# Custom deterministic operation
def generate_sequence_number() -> int:
return random.randint(1000, 9999)
sequence = deterministic_operations_example.wf.deterministic._deterministic_operation(
"sequence", generate_sequence_number
)
return {
"random_value": random_value,
"unique_id": unique_id,
"start_time": start_time.isoformat(),
"later_time": later_time.isoformat(),
"sequence_number": sequence
}
Workflow State Management¶
@app.task
def stateful_workflow(user_id: str) -> dict[str, Any]:
"""Demonstrate workflow state management."""
# Store workflow data that persists across executions
stateful_workflow.wf.set_data("user_id", user_id)
stateful_workflow.wf.set_data("started_at", stateful_workflow.wf.utc_now().isoformat())
# Perform some operations
processing_steps = []
for step in range(3):
step_id = stateful_workflow.wf.uuid()
step_time = stateful_workflow.wf.utc_now()
processing_steps.append({
"step": step + 1,
"step_id": step_id,
"timestamp": step_time.isoformat()
})
# Store intermediate results
stateful_workflow.wf.set_data(f"step_{step + 1}_result", step_id)
# Retrieve stored data
user_id_retrieved = stateful_workflow.wf.get_data("user_id")
started_at = stateful_workflow.wf.get_data("started_at")
return {
"user_id": user_id_retrieved,
"started_at": started_at,
"processing_steps": processing_steps,
"completed_at": stateful_workflow.wf.utc_now().isoformat()
}
Task Execution Within Workflows¶
@app.task
def parent_workflow(data: dict[str, Any]) -> dict[str, Any]:
"""Parent workflow that executes child tasks."""
# Execute tasks deterministically within the workflow
validation_result = parent_workflow.wf.execute_task(
validate_data, data
)
if validation_result.result["valid"]:
# Execute processing task only if validation succeeds
processing_result = parent_workflow.wf.execute_task(
process_data, data, validation_result.result["normalized_data"]
)
# Execute notification task
notification_result = parent_workflow.wf.execute_task(
send_notification, processing_result.result
)
return {
"status": "completed",
"validation": validation_result.result,
"processing": processing_result.result,
"notification": notification_result.result
}
else:
return {
"status": "failed",
"validation": validation_result.result,
"error": "Data validation failed"
}
@app.task
def validate_data(data: dict[str, Any]) -> dict[str, Any]:
"""Validate input data."""
# Validation logic here
normalized_data = {k.lower(): v for k, v in data.items()}
return {
"valid": len(normalized_data) > 0,
"normalized_data": normalized_data,
"validated_at": validate_data.wf.utc_now().isoformat()
}
@app.task
def process_data(original_data: dict[str, Any], normalized_data: dict[str, Any]) -> dict[str, Any]:
"""Process the validated data."""
processing_id = process_data.wf.uuid()
return {
"processing_id": processing_id,
"processed_records": len(normalized_data),
"processed_at": process_data.wf.utc_now().isoformat()
}
@app.task
def send_notification(processing_result: dict[str, Any]) -> dict[str, Any]:
"""Send processing completion notification."""
notification_id = send_notification.wf.uuid()
return {
"notification_id": notification_id,
"sent_at": send_notification.wf.utc_now().isoformat(),
"processing_id": processing_result["processing_id"]
}
Complex Workflow with Error Handling¶
@app.task
def robust_processing_workflow(order_data: dict[str, Any]) -> dict[str, Any]:
"""Complex workflow with built-in error handling and retry logic."""
workflow_start = robust_processing_workflow.wf.utc_now()
order_id = order_data.get("order_id", robust_processing_workflow.wf.uuid())
# Store initial workflow state
robust_processing_workflow.wf.set_data("order_id", order_id)
robust_processing_workflow.wf.set_data("started_at", workflow_start.isoformat())
robust_processing_workflow.wf.set_data("status", "processing")
try:
# Step 1: Validate order
validation_result = robust_processing_workflow.wf.execute_task(
validate_order, order_data
)
if not validation_result.result["valid"]:
robust_processing_workflow.wf.set_data("status", "validation_failed")
return {
"order_id": order_id,
"status": "failed",
"step": "validation",
"error": validation_result.result["errors"]
}
# Step 2: Process payment
payment_result = robust_processing_workflow.wf.execute_task(
process_payment_with_retry, order_data
)
robust_processing_workflow.wf.set_data("payment_id", payment_result.result["payment_id"])
# Step 3: Reserve inventory
inventory_result = robust_processing_workflow.wf.execute_task(
reserve_inventory, order_data
)
robust_processing_workflow.wf.set_data("reservation_id", inventory_result.result["reservation_id"])
# Step 4: Generate shipping
shipping_result = robust_processing_workflow.wf.execute_task(
generate_shipping_label, order_id, inventory_result.result
)
# Mark workflow as completed
robust_processing_workflow.wf.set_data("status", "completed")
completion_time = robust_processing_workflow.wf.utc_now()
return {
"order_id": order_id,
"status": "completed",
"payment_id": payment_result.result["payment_id"],
"reservation_id": inventory_result.result["reservation_id"],
"tracking_number": shipping_result.result["tracking_number"],
"started_at": workflow_start.isoformat(),
"completed_at": completion_time.isoformat(),
"processing_time_seconds": (completion_time - workflow_start).total_seconds()
}
except Exception as e:
# Handle workflow errors
robust_processing_workflow.wf.set_data("status", "error")
robust_processing_workflow.wf.set_data("error", str(e))
# Execute cleanup if needed
cleanup_result = robust_processing_workflow.wf.execute_task(
cleanup_failed_order, order_id
)
return {
"order_id": order_id,
"status": "error",
"error": str(e),
"cleanup_result": cleanup_result.result
}
@app.task
def validate_order(order_data: dict[str, Any]) -> dict[str, Any]:
"""Validate order data with deterministic processing."""
validation_id = validate_order.wf.uuid()
validated_at = validate_order.wf.utc_now()
errors = []
if not order_data.get("customer_id"):
errors.append("Missing customer_id")
if not order_data.get("items"):
errors.append("No items in order")
return {
"validation_id": validation_id,
"valid": len(errors) == 0,
"errors": errors,
"validated_at": validated_at.isoformat()
}
@app.task
def process_payment_with_retry(order_data: dict[str, Any]) -> dict[str, Any]:
"""Process payment with deterministic retry logic."""
payment_id = process_payment_with_retry.wf.uuid()
attempt_time = process_payment_with_retry.wf.utc_now()
# Simulate processing logic with deterministic behavior
success_chance = process_payment_with_retry.wf.random()
if success_chance > 0.1: # 90% success rate
return {
"payment_id": payment_id,
"status": "completed",
"amount": order_data.get("total", 0),
"processed_at": attempt_time.isoformat()
}
else:
# In a real implementation, this would trigger retry logic
return {
"payment_id": payment_id,
"status": "failed",
"error": "Payment processing failed",
"attempted_at": attempt_time.isoformat()
}
@app.task
def reserve_inventory(order_data: dict[str, Any]) -> dict[str, Any]:
"""Reserve inventory for order items."""
reservation_id = reserve_inventory.wf.uuid()
reserved_at = reserve_inventory.wf.utc_now()
items = order_data.get("items", [])
reserved_items = []
for item in items:
item_reservation_id = reserve_inventory.wf.uuid()
reserved_items.append({
"item_id": item.get("id"),
"quantity": item.get("quantity"),
"reservation_id": item_reservation_id
})
return {
"reservation_id": reservation_id,
"reserved_items": reserved_items,
"reserved_at": reserved_at.isoformat()
}
@app.task
def generate_shipping_label(order_id: str, inventory_data: dict[str, Any]) -> dict[str, Any]:
"""Generate shipping label for reserved inventory."""
tracking_number = f"TRK-{order_id}-{int(generate_shipping_label.wf.random() * 100000):05d}"
label_id = generate_shipping_label.wf.uuid()
generated_at = generate_shipping_label.wf.utc_now()
return {
"label_id": label_id,
"tracking_number": tracking_number,
"order_id": order_id,
"reservation_id": inventory_data["reservation_id"],
"generated_at": generated_at.isoformat()
}
@app.task
def cleanup_failed_order(order_id: str) -> dict[str, Any]:
"""Clean up resources for a failed order."""
cleanup_id = cleanup_failed_order.wf.uuid()
cleaned_at = cleanup_failed_order.wf.utc_now()
return {
"cleanup_id": cleanup_id,
"order_id": order_id,
"actions": ["released_inventory", "voided_payment", "notified_customer"],
"cleaned_at": cleaned_at.isoformat()
}
Features¶
Deterministic Execution¶
Random Numbers:
task.wf.random()generates deterministic random numbersTimestamps:
task.wf.utc_now()provides deterministic time progressionUUIDs:
task.wf.uuid()creates deterministic unique identifiersTask Execution:
task.wf.execute_task()ensures deterministic task invocation
Workflow State Management¶
Data Storage:
task.wf.set_data(key, value)andtask.wf.get_data(key)for persistent key-value storageWorkflow Identity: Each workflow has a unique identity that tracks relationships and lineage
Context Inheritance: Child tasks automatically inherit parent workflow context
Integration with Task System¶
Seamless Integration: Any task can participate in workflows without modification
Automatic Context: Tasks executed within workflows automatically get workflow context
Flexible Boundaries: Control when new workflows are created with
force_new_workflow
Replay and Recovery¶
Perfect Replay: Workflows replay identically, including all random operations and timestamps
State Persistence: All workflow state is automatically persisted across executions
Failure Recovery: Workflows can resume from exact points of failure
Best Practices¶
Use Deterministic Operations: Always use workflow-provided operations (
wf.random(),wf.utc_now(),wf.uuid()) instead of standard library functions for reproducibility.Store Important State: Use
wf.set_data()to store critical workflow state that should persist across executions.Design for Idempotency: Ensure workflow steps can be safely re-executed without side effects.
Handle Errors Gracefully: Implement proper error handling and cleanup logic within workflows.
Use Workflow Boundaries: Use
force_new_workflow=Trueto create clear boundaries between independent workflow contexts.Keep Workflows Focused: Design workflows with clear, single purposes rather than overly complex multi-purpose workflows.
Test Replay Behavior: Test that your workflows behave correctly when replayed from stored state.
The Pynenc Workflow System provides a robust foundation for building complex, reliable task orchestration with built-in state management, deterministic execution, and automatic replay capabilities.