Usage Guide

This Usage Guide is designed to provide you with detailed instructions and practical examples to harness the full potential of Pynenc in various scenarios. Whether you’re a beginner or an advanced user, this guide aims to help you navigate through the features and functionalities of Pynenc with ease.

Getting Started with Pynenc

Before diving into specific use cases, ensure that you have Pynenc installed and configured correctly in your environment. Refer to the Getting Started section for installation instructions and initial setup.

Best Practices for Task Definition

Important

When defining tasks in Pynenc, it is crucial to avoid creating tasks in modules that are designed to run
as standalone scripts. In Python, if a module is run directly (either as a script or via `python -m module`),
its `__name__` attribute is set to `"__main__"`. This can cause issues in a distributed environment like Pynenc,
where the `__main__` module refers to the worker process, leading to difficulties in task identification and execution.
For more information on this limitation and how to structure your tasks correctly, refer to the [FAQ section](faq.rst).

Use Case Scenarios

Each section below gives a self-contained summary with a key code snippet. Use the cards to jump to any full step-by-step guide, or follow the quick link directly beneath each section heading.

1 · Basic Local Threaded

Run your first task with ThreadRunner — no infrastructure required.

Basic Local Threaded
2 · Distributed with Redis

Scale out with pynenc-redis and ProcessRunner in a Docker environment.

Basic Redis
3 · Concurrency Control

Prevent duplicate work with TASK-level and DISABLED concurrency modes.

Concurrency Control
4 · Auto Orchestration

Fibonacci via recursive tasks — Pynenc resolves dependencies automatically.

Automatic Orchestration
5 · Sync Unit Testing

One flag forces synchronous execution for simple assertEqual tests.

Synchronous Unit Testing
6 · Mem Mode Testing

Full in-memory stack with ThreadRunner — CI-friendly, zero infrastructure.

In-Memory Unit Testing
7 · JSON Serialization

Add to_json / from_json to any class for JsonSerializer compatibility.

Custom JSON Serialization
9 · Client Data Store

Automatically cache large arguments to reduce serialization overhead at scale.

Argument Caching
10 · Trigger System

Schedule tasks with cron or fire them on status changes, results, and events.

Trigger System
11 · Workflow System

Deterministic, replayable, stateful workflows with automatic failure recovery.

Workflow System
Invocation Status

State machine lifecycle, ownership tracking, and recovery mechanics.

Invocation Status System

Use Case 1: Basic Local Threaded Demonstration

📖 Full step-by-step guide

Learn the basics of setting up and executing tasks using Pynenc in a local, non-distributed environment. This use case is ideal for understanding the fundamental workings of Pynenc, especially for development and testing purposes.

from pynenc import Pynenc

app = Pynenc()

@app.task
def add(x: int, y: int) -> int:
    add.logger.info(f"{add.task_id=} Adding {x} + {y}")
    return x + y

For a detailed guide and example, see Basic Local Threaded.

Use Case 2: Distributed System with Redis and Process Runner

📖 Full step-by-step guide

Explore setting up a distributed task processing system using pynenc with Redis. This use case demonstrates how to configure and run tasks in a distributed environment, leveraging the Redis plugin for task queuing and the ProcessRunner for executing tasks across multiple processes.

import time
from pynenc import Pynenc

app = Pynenc()

@app.task
def add(x: int, y: int) -> int:
    add.logger.info(f"{add.task_id=} Adding {x} + {y}")
    return x + y

@app.task
def sleep(x: int) -> int:
    add.logger.info(f"{sleep.task_id=} Sleeping for {x} seconds")
    time.sleep(x)
    add.logger.info(f"{sleep.task_id=} Done sleeping for {x} seconds")
    return x

Prerequisites: This use case requires the Redis plugin:

pip install pynenc-redis

Configuration is key to integrating Redis with pynenc, as shown in the pyproject.toml setup. This setup enables tasks to be queued and processed in a truly distributed manner.

[tool.pynenc]
app_id = "app_basic_redis_example"
orchestrator_cls = "RedisOrchestrator"
broker_cls = "RedisBroker"
state_backend_cls = "RedisStateBackend"
serializer_cls = "JsonSerializer"
runner_cls = "ProcessRunner"

[tool.pynenc.redis]
redis_host = "redis"

Tasks are executed through a simple Python script that triggers them. Running the pynenc worker and executing tasks can be done in a local development environment or within Docker for a more isolated setup.

Execute tasks directly or in parallel to understand the power of distributed task processing with pynenc and Redis. Also, explore running the system in development mode for debugging and testing without the need for Redis.

For a detailed guide and example, see Basic Redis.

Use Case 3: Concurrency Control

📖 Full step-by-step guide

Dive into the mechanics of concurrency control within Pynenc. This use case demonstrates various settings for concurrency control, such as disabling concurrent execution or enforcing task-level concurrency, to ensure tasks are executed according to specific requirements.

from pynenc import Pynenc, ConcurrencyControlType

app = Pynenc()

@app.task(registration_concurrency=ConcurrencyControlType.DISABLED)
def get_own_invocation_id() -> str:
    return get_own_invocation_id.invocation.invocation_id

@app.task(registration_concurrency=ConcurrencyControlType.TASK)
def get_own_invocation_id_registration_concurrency() -> str:
    return get_own_invocation_id_registration_concurrency.invocation.invocation_id

@app.task(running_concurrency=ConcurrencyControlType.DISABLED)
def sleep_without_running_concurrency(seconds: float) -> SleepResult:
    start = time.time()
    time.sleep(seconds)
    return SleepResult(start=start, end=time.time())

@app.task(running_concurrency=ConcurrencyControlType.TASK)
def sleep_with_running_concurrency(seconds: float) -> SleepResult:
    start = time.time()
    time.sleep(seconds)
    return SleepResult(start=start, end=time.time())

Through practical examples, see how tasks behave differently under various concurrency controls.

For a detailed guide and examples, see Concurrency Control.

Use Case 4: Automatic Orchestration

📖 Full step-by-step guide

Delve into the advanced features of Pynenc with the Automatic Orchestration use case, showcasing the library’s ability to manage task dependencies. This scenario uses the well-known Fibonacci sequence to illustrate how Pynenc automatically orchestrates the execution of dependent tasks, ensuring that tasks are executed in the correct sequence without manual intervention.

from pynenc import Pynenc

app = Pynenc()

@app.task
def fibonacci(n: int) -> int:
    fibonacci.logger.info(f"Calculating fibonacci({n})")
    if n <= 1:
        return n
    else:
        return fibonacci(n - 1).result + fibonacci(n - 2).result

This use case highlights how tasks that depend on the results of previous tasks can be executed seamlessly, showcasing Pynenc’s capability to pause and resume tasks as needed based on their dependencies. It’s an ideal demonstration for understanding Pynenc’s orchestration mechanisms in scenarios with complex task dependencies.

For a detailed guide and examples, see Automatic Orchestration.

Use Case 5: Unit Testing with Synchronous Mode

📖 Full step-by-step guide

Discover the simplicity of unit testing Pynenc tasks using the synchronous execution mode. This approach facilitates testing by executing tasks sequentially within the test process, allowing for straightforward assertion of task outcomes without the need for an asynchronous execution environment.

from pynenc import Pynenc

app = Pynenc()

@app.task
def add(x: int, y: int) -> int:
    add.logger.info(f"{add.task_id=} Adding {x} + {y}")
    return x + y

The synchronous execution mode is particularly useful for testing tasks in isolation, ensuring that their logic functions as expected without external dependencies or the complexity of an asynchronous runtime.

To enable synchronous mode during testing, you can directly configure the Pynenc application within your test setup or use environment variables to adjust the runtime behavior.

import unittest
from unittest.mock import patch

import tasks

class TestAddTask(unittest.TestCase):
    def setUp(self) -> None:
        # Enable synchronous task execution
        tasks.app.conf.dev_mode_force_sync_tasks = True

    def test_add_functionality(self) -> None:
        # Test the add task
        result = tasks.add(1, 2).result
        self.assertEqual(result, 3)

This use case is instrumental in demonstrating how Pynenc’s design accommodates unit testing, promoting testability and reliability of task-based applications. It underscores Pynenc’s adaptability to development workflows, ensuring tasks can be thoroughly tested in a simplified execution context.

For a detailed guide and examples, see Synchronous Unit Testing.

Use Case 6: Unit Testing with Mem Mode

📖 Full step-by-step guide

Explore the efficiency of unit testing Pynenc tasks using the Mem mode, where all operational components such as broker, orchestrator, and state backend utilize in-memory implementations. This approach enables fast, isolated testing without the reliance on external infrastructure, making it ideal for rapid development cycles and CI/CD pipelines.

from pynenc import Pynenc

app = Pynenc()

@app.task
def add(x: int, y: int) -> int:
    add.logger.info(f"{add.task_id=} Adding {x} + {y}")
    return x + y

Leveraging Mem mode for unit testing streamlines the testing process, ensuring tasks are executed in a controlled, predictable manner. This method is particularly beneficial for validating task logic and behavior under various conditions without the overhead of configuring external services or dealing with asynchronous execution complexities.

import unittest
import tasks

class TestAddTaskMemMode(unittest.TestCase):
    def setUp(self) -> None:
        # Set Pynenc to use in-memory components for testing
        tasks.app.conf.dev_mode_force_sync_tasks = False
        tasks.app.conf.orchestrator_cls = 'MemOrchestrator'
        tasks.app.conf.broker_cls = 'MemBroker'
        tasks.app.conf.state_backend_cls = 'MemStateBackend'
        tasks.app.conf.runner_cls = 'ThreadRunner'

        # Start the runner in a separate thread for asynchronous task execution
        self.thread = threading.Thread(target=tasks.app.runner.run, daemon=True)
        self.thread.start()

    def tearDown(self):
        # Ensure the runner is stopped after tests
        tasks.app.runner.stop_runner_loop()
        self.thread.join()

    def test_add_in_mem_mode(self) -> None:
        # Test the add task under Mem mode
        result = tasks.add(1, 2).result
        self.assertEqual(result, 3)

Explore unit testing of Pynenc tasks using Mem mode, which employs in-memory components for brokers, orchestrators, and state backends. This mode offers a swift and isolated testing approach, devoid of external dependencies, perfect for CI/CD pipelines and swift development cycles.

This use case also demonstrates configuring Pynenc through environment variables for in-memory testing, providing an alternative method to adjust runtime behavior for tests.

For a detailed guide and examples, see In-Memory Unit Testing.

Use Case 7: Custom JSON Serialization with JsonSerializable

📖 Full step-by-step guide

When using JsonSerializer, any task argument or return value that is not a JSON primitive will raise a TypeError — unless the class implements the JsonSerializable protocol. This protocol is a lightweight alternative to switching serializers: add two methods to your domain object and Pynenc handles the rest automatically.

from pynenc.serializer import JsonSerializable

class Money:
    def __init__(self, amount: float, currency: str) -> None:
        self.amount = amount
        self.currency = currency

    def to_json(self) -> dict:
        return {"amount": self.amount, "currency": self.currency}

    @classmethod
    def from_json(cls, data: dict) -> "Money":
        return cls(data["amount"], data["currency"])

With both methods in place, any Pynenc task can accept and return Money values without further configuration:

from pynenc import Pynenc

app = Pynenc()

@app.task
def calculate_total(unit_price: Money, quantity: int) -> Money:
    return Money(unit_price.amount * quantity, unit_price.currency)

The serializer embeds the class’s module and qualified name alongside the data so that deserialization reconstructs the exact original type — not a plain dict.

Method

Direction

Purpose

to_json(self) -> Any

object → JSON

Return a JSON-native value (dict, list, str, …)

from_json(cls, data) -> Self

JSON → object

Reconstruct from the value to_json returned

Because JsonSerializable is @runtime_checkable, compliance can be verified at runtime with a standard isinstance check.

For a detailed guide and examples, see Custom JSON Serialization.

Use Case 8: Customizing Data Serialization

Pynenc provides built-in support for common serialization formats through its serializer classes:

  • JsonPickleSerializer (default): Preserves Python object types using the jsonpickle library. Best for internal persistence with trusted data.

  • JsonSerializer: Pure JSON serialization for interoperability.

  • PickleSerializer: Native Python pickle serialization for complex objects.

Warning

The jsonpickle serializer can reconstruct arbitrary Python objects on deserialization — use it only for trusted, internal persistence (local state backends).

Creating Custom Serializers

You can create a custom serializer to handle any specific requirements of your tasks. This could be necessary when dealing with complex data types that are not natively supported by JSON or Pickle, or if you need to integrate with external systems that use a different data format.

To create a custom serializer, you need to subclass the BaseSerializer and implement the required serialization and deserialization methods. Here’s a simplified example:

    from pynenc.serializer import BaseSerializer

    class CustomSerializer(BaseSerializer):
        def serialize(self, obj):
            # Implement custom serialization logic
            return serialized_obj

        def deserialize(self, serialized_obj):
            # Implement custom deserialization logic
            return obj

Configuring Your Custom Serializer

Once your custom serializer is implemented, you can configure Pynenc to use it:

Using the builder:

    from pynenc.builder import PynencBuilder

    app = PynencBuilder().custom_config(serializer_cls="path.to.CustomSerializer").build()

Or using environment variables:

    PYNENC__SERIALIZER_CLS="path.to.CustomSerializer"

For more details on configuration options, refer to the Configuration System.

Use Case 9: Client Data Store

📖 Full step-by-step guide

Pynenc’s client data store optimizes task execution by efficiently handling large serialized arguments.

from pynenc import Pynenc
import numpy as np

app = Pynenc()

@app.task
def process_array(data: np.ndarray) -> float:
    """Process a large numpy array with automatic argument caching."""
    return float(data.mean())

# Large arrays will be automatically cached based on size threshold
large_array = np.random.rand(1000000)
result = process_array(large_array)

The argument caching system offers several key features:

  • Automatic caching of large arguments based on configurable size thresholds

  • Multiple backend options (Redis for distributed, Memory for local development)

  • Process-safe shared caching through runner-level storage

  • Smart detection to prevent redundant serialization

  • Fine-grained control over caching behavior per task and argument

  • LRU cache management for optimal memory usage

Configure the caching behavior through simple configuration settings:

[tool.pynenc.client_data_store]
min_size_to_cache = 1024  # Cache arguments larger than 1KB
local_cache_size = 1000   # Keep 1000 most recent entries

This use case demonstrates how Pynenc’s argument caching can significantly improve performance in distributed systems by reducing network traffic and serialization overhead.

For a detailed guide and examples, see Argument Caching.

Use Case 10: Trigger System

📖 Full step-by-step guide

Explore Pynenc’s powerful trigger system, which enables declarative task scheduling and event-driven workflows. This feature allows you to automatically execute tasks in response to various conditions such as scheduled times, task status changes, results, exceptions, or custom events.

from pynenc import Pynenc
from pynenc.trigger.trigger_builder import TriggerBuilder

app = Pynenc()

@app.task
def source_task(x: int) -> str:
    return f"Processed {x}"

# Define a task that runs when source_task completes successfully
@app.task(triggers=TriggerBuilder().on_status(source_task, statuses=["SUCCESS"]))
def notification_task() -> str:
    return "Source task completed successfully"

The trigger system provides a comprehensive framework for automating workflows with features including:

  • Diverse trigger conditions (cron expressions, task statuses, results, exceptions, events)

  • Flexible argument handling with providers that can generate task arguments dynamically

  • Conditional execution with filters based on arguments, results, or payload content

  • Composite conditions using AND/OR logic for complex triggering rules

This use case demonstrates how to create self-managing workflows that respond to system events and task outcomes, reducing the need for manual orchestration.

For a detailed guide and examples, see Trigger System.

Use Case 11: Workflow System

📖 Full step-by-step guide

Discover Pynenc’s advanced workflow system for building sophisticated task orchestration with deterministic execution, state management, and automatic replay capabilities. The workflow system enables complex multi-step processes that can recover from failures and maintain consistency across distributed environments.

from pynenc import Pynenc

app = Pynenc()

@app.task
def process_order_workflow(order_id: str) -> dict[str, Any]:
    """
    Order processing workflow with deterministic operations.

    All random operations and timestamps are deterministic and will
    replay identically during failure recovery.
    """
    # Generate deterministic tracking number
    tracking_number = f"TRK-{order_id}-{int(process_order_workflow.wf.random() * 100000):05d}"

    # Execute payment processing within workflow context
    payment_result = process_order_workflow.wf.execute_task(
        process_payment, order_id
    )

    # Store workflow state for persistence
    process_order_workflow.wf.set_data("tracking_number", tracking_number)
    process_order_workflow.wf.set_data("payment_id", payment_result.result["payment_id"])

    return {
        "order_id": order_id,
        "tracking_number": tracking_number,
        "payment_status": payment_result.result["status"],
        "workflow_id": process_order_workflow.workflow.workflow_id
    }

@app.task(force_new_workflow=True)
def independent_audit_workflow(order_id: str) -> dict[str, Any]:
    """
    Independent audit workflow with force_new_workflow=True.

    This always creates a new workflow context regardless of calling context.
    """
    audit_id = independent_audit_workflow.wf.uuid()
    return {
        "audit_id": audit_id,
        "order_id": order_id,
        "workflow_id": independent_audit_workflow.workflow.workflow_id
    }

The workflow system provides essential features for enterprise-grade task orchestration:

  • Deterministic Execution: All non-deterministic operations (random, UUID, timestamps) are made deterministic for perfect replay

  • Workflow Identity: Unique workflow contexts with parent-child relationships and inheritance

  • State Persistence: Automatic state management with key-value storage for workflow data

  • Task Integration: Seamless integration with existing Pynenc tasks and execution infrastructure

  • Workflow Boundaries: Use force_new_workflow=True decorator to create independent workflow contexts

  • Failure Recovery: Workflows can resume from exact points of failure with identical replay behavior

This use case demonstrates how to build robust, stateful workflows that can handle complex business logic while providing reliability guarantees and failure recovery capabilities.

For a detailed guide and examples, see Workflow System.

Invocation Status System

📖 Full reference guide

Pynenc uses a declarative, type-safe state machine to manage the lifecycle of task invocations. This system provides:

  • Ownership Tracking: Each invocation is owned by a specific runner during execution

  • Valid State Transitions: The state machine enforces which transitions are allowed

  • Automatic Recovery: Stuck invocations are automatically recovered when runners become inactive

  • Concurrency Control Integration: Status transitions integrate with concurrency control rules

Key status categories include:

  • Available for Run: REGISTERED, REROUTED, RETRY

  • Owned by Runner: PENDING, RUNNING, PAUSED, RESUMED

  • Recovery: PENDING_RECOVERY, RUNNING_RECOVERY

  • Final: SUCCESS, FAILED, CONCURRENCY_CONTROLLED_FINAL

For a detailed guide on the status system, state diagram, and recovery mechanisms, see Invocation Status System.