pynenc.app

Module Contents

Classes

Pynenc

The main class of the Pynenc library that creates an application object.

Functions

_new_pynenc

Module-level pickle reconstructor that forwards config to Pynenc.new.

API

pynenc.app._new_pynenc(cls: type[Pynenc], config_values: dict[str, Any] | None, config_filepath: str | None) Pynenc[source]

Module-level pickle reconstructor that forwards config to Pynenc.new.

Pickle calls __new__ with no arguments by default, which breaks the multiton lookup (config_values=None → app_id defaults to ‘pynenc’). This helper is returned by __reduce__ so pickle calls it instead, ensuring __new__ receives the real config_values.

The instance is pre-registered in _instances and minimally initialised so that module-level Pynenc() calls triggered during state deserialisation (Task.__setstate__importlib.import_module) reuse this instance instead of creating a second one with a freshly generated temp DB path.

class pynenc.app.Pynenc(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)[source]

The main class of the Pynenc library that creates an application object.

Parameters:
  • config_values (dict[str, Any] | None) – A dictionary of configuration values. Use {"app_id": "my_app"} to set the application identifier.

  • config_filepath (str | None) – A path to a configuration file.

Note

All of these base classes are abstract and cannot be used directly. If none is specified, they will default to MemTaskBroker, MemStateBackend, etc. These default classes do not actually distribute the code but are helpers for tests or for running an application on your localhost. They may help to parallelize to some degree but cannot be used in a production system.

Initialization

_instances: ClassVar[dict[str, pynenc.app.Pynenc]]

None

__new__(config_values: dict[str, Any] | None = None, config_filepath: str | None = None) pynenc.app.Pynenc[source]

Return existing instance for this app_id if one was registered by setstate.

classmethod from_info(app_info: pynenc.app_info.AppInfo) pynenc.app.Pynenc[source]

Create a Pynenc app instance from AppInfo.

Parameters:

app_info – The AppInfo object containing app metadata

Returns:

A new Pynenc app instance

register_core_tasks() None[source]

Register all core tasks defined in the core_tasks_registry.

register_core_task(task_def: pynenc.core_tasks.CoreTaskDefinition) None[source]
_store_deferred_trigger(task: pynenc.task.Task, triggers: Union[pynenc.trigger.TriggerBuilder, list[pynenc.trigger.TriggerBuilder], None]) None[source]

Store triggers to be registered later by the runner.

Parameters:
  • task (Task) – The Task instance that declared triggers

  • triggers – TriggerBuilder or list of TriggerBuilder

register_deferred_triggers() None[source]

Register all deferred task triggers that were collected during task decoration.

This is intended to be called by a runner during startup after relevant modules have been imported so trigger backends are available.

property config_values: dict[str, Any] | None
property config_filepath: str | None
property app_id: str
property tasks: dict[pynenc.identifiers.task_id.TaskId, pynenc.task.Task]

Get the dictionary of registered tasks.

Returns:

A dictionary mapping task_id to Task instances.

get_task(task_id: pynenc.identifiers.task_id.TaskId) pynenc.task.Task[source]

Get a task by its ID.

Parameters:

task_id – The ID of the task to retrieve.

Returns:

The Task instance if found, None otherwise.

warning it may overwrite the options

__reduce__() tuple[source]

Control pickle reconstruction so new receives config_values.

Without this, pickle calls Pynenc.__new__(Pynenc) with no args, causing the multiton to fall back to the default app_id (‘pynenc’) and return a stale instance for same-process roundtrips.

__getstate__() dict[source]

Return the serializable state of the app for pickling or multiprocessing.

__setstate__(state: dict) None[source]

Restore the app state and register in the multiton.

When a child process (spawned via multiprocessing) unpickles the parent’s app, this registers the instance so that subsequent Pynenc(config_values={"app_id": ...}) calls (e.g. during module re-import) return the same already-configured instance.

_reset_cached_components() None[source]

Reset all lazily-initialised components so they are re-created on next access.

property conf: pynenc.conf.config_pynenc.ConfigPynenc
property logger: logging.Logger
property orchestrator: pynenc.orchestrator.base_orchestrator.BaseOrchestrator
property trigger: pynenc.trigger.base_trigger.BaseTrigger
property broker: pynenc.broker.base_broker.BaseBroker
property state_backend: pynenc.state_backend.base_state_backend.BaseStateBackend
property serializer: pynenc.serializer.base_serializer.BaseSerializer
property client_data_store: pynenc.client_data_store.base_client_data_store.BaseClientDataStore
property runner: pynenc.runner.base_runner.BaseRunner

Get the runner for this app, prioritizing thread/process-specific context.

First, it checks the thread-local context for a runner (via get_current_runner). This is crucial in the MultiThreadRunner, where each process runs a ThreadRunner and needs to use its own runner instance rather than the app’s default.

If no context runner exists, it falls back to the instance-level runner. This mechanism ensures correct runner isolation across threads and processes.

Returns:

The runner instance for the current context or the app instance.

purge() None[source]

Purge all data from the broker and state backend

task(func: Optional[pynenc.types.Func] = None, *, parallel_batch_size: int | None = None, retry_for: tuple[type[Exception], ...] | None = None, max_retries: int | None = None, running_concurrency: pynenc.conf.config_task.ConcurrencyControlType | None = None, registration_concurrency: pynenc.conf.config_task.ConcurrencyControlType | None = None, key_arguments: tuple[str, ...] | None = None, on_diff_non_key_args_raise: bool | None = None, call_result_cache: bool | None = None, disable_cache_args: tuple[str, ...] | None = None, triggers: Union[pynenc.trigger.TriggerBuilder, list[pynenc.trigger.TriggerBuilder]] | None = None, force_new_workflow: bool | None = None, reroute_on_concurrency_control: bool | None = None) Task | Callable[[Func], Task][source]

The task decorator converts the function into an instance of a BaseTask. It accepts any kind of options, however these options will be validated with the options class assigned to the class.

Parameters:
  • func (Optional[Callable]) – The function to be converted into a Task instance.

  • parallel_batch_size (int | None) – If set to 0, auto parallelization is disabled. If greater than 0, tasks with iterable arguments are automatically split into chunks.

  • retry_for (Optional[Tuple[Exception, ]]) – Exceptions for which the task should be retried.

  • max_retries (int | None) – The maximum number of retries for a task.

  • running_concurrency (ConcurrencyControlType | None) – Controls the concurrency behavior of the task.

  • registration_concurrency (ConcurrencyControlType | None) – Manages task registration concurrency.

  • key_arguments (Optional[Tuple[str, ]]) – Key arguments for concurrency control.

  • on_diff_non_key_args_raise (bool | None) – If True, raises an exception for task invocations with matching key arguments but different non-key arguments.

  • call_result_cache (bool | None) – If True, it will return the latest result of a Task with the same arguments if availble, otherwise it will trigger a new invocation as expected.

  • disable_cache_args (tuple[str, ] | None) – Arguments to exclude from caching, it will accept “*” to disable caching for all arguments.

  • triggers (Union[TriggerBuilder, list[TriggerBuilder]] | None) – Trigger definitions that determine when this task should execute automatically. Can be a single TriggerBuilder or a list of builders for multiple trigger conditions.

  • force_new_workflow (bool | None) – If True, this task will always create a new workflow when invoked. Even when called from within another workflow, it creates a subworkflow that maintains a reference to its parent workflow.

Returns:

A Task instance or a callable that when called returns a Task instance.

Example:

# Basic task with no triggers
@app.task(max_retries=3)
def simple_task(x: int, y: int) -> int:
    return x + y

# Task with a single trigger using a cron schedule
from pynenc.trigger import on_cron
@app.task(triggers=on_cron("0 0 * * *"))  # Run daily at midnight
def daily_report() -> None:
    # Generate daily report
    pass

# Task with multiple triggers using different conditions
from pynenc.trigger import on_event, on_status
@app.task(
    triggers=[
        on_event("payment.completed", filters={"amount": {"$gt": 1000}}),
        on_status("validate_data", statuses=["SUCCESS"])
    ]
)
def process_important_payment(payment_id: str) -> None:
    # Process high-value payment after validation
    pass

# Task with complex trigger condition using a builder
from pynenc.trigger import TriggerBuilder
from pynenc.trigger.conditions import CompositeLogic

trigger = (
    TriggerBuilder()
    .on_event("payment.received")
    .on_status("validate_payment")
    .with_logic(CompositeLogic.AND)  # Both conditions must be met
    .with_arguments(lambda ctx: {"payment_id": ctx["event"].payload["id"]})
)

@app.task(triggers=trigger)
def process_payment(payment_id: str) -> None:
    # Process payment that has been received and validated
    pass
direct_task(func: Optional[Func[Params, Result]] = None, *, parallel_func: ParallelFunc | None = None, aggregate_func: AggregateFunc | None = None, parallel_batch_size: int | None = None, retry_for: tuple[type[Exception], ...] | None = None, max_retries: int | None = None, running_concurrency: pynenc.conf.config_task.ConcurrencyControlType | None = None, registration_concurrency: pynenc.conf.config_task.ConcurrencyControlType | None = None, key_arguments: tuple[str, ...] | None = None, on_diff_non_key_args_raise: bool | None = None, call_result_cache: bool | None = None, disable_cache_args: tuple[str, ...] | None = None, force_new_workflow: bool | None = None, reroute_on_concurrency_control: bool | None = None) Func[Params, Result] | Callable[[Func[Params, Result]], Func[Params, Result]][source]

Create a task that directly returns its result rather than returning an invocation.

This decorator maintains the original function’s behavior:

  • For synchronous functions, it waits for the result and returns it directly

  • For async functions, it returns an awaitable that resolves to the result

It also supports parallel execution via the parallel_func parameter, which takes a function that generates arguments for parallel processing, and aggregate_func, which combines the results.

Parameters:
  • func (Optional[Func]) – The function to be converted into a Task instance that returns results directly.

  • parallel_func (Optional[ParallelFunc]) –

    Function that takes a dict of key arguments and returns either:

    1. An iterable of parameters for parallel execution (can be tuples, dicts, or Arguments)

      # Example returning just parameters
      lambda args: [(i, i+1) for i in range(5)]  # Returns tuples
      lambda args: [{"x": i, "y": i+1} for i in range(5)]  # Returns dicts
      
    2. A tuple containing (common_args, param_iter) for efficient handling of large shared data:

      • common_args: Dictionary of arguments shared by all parallel tasks

      • param_iter: Iterable of dictionaries with task-specific arguments

      # Example with common arguments
      lambda args: {
          "common_args": {"large_data": args["large_data"]},  # Shared data (serialized once)
          "param_iter": [{"index": i} for i in range(10)]  # Task-specific args
      }
      

      This second approach provides major performance benefits when dealing with large shared arguments (20MB+) as they’re serialized only once instead of for each parallel task.

  • aggregate_func (Optional[AggregateFunc]) – Function that takes a list of results and aggregates them into a single result.

  • parallel_batch_size (int | None) – If set to 0, auto parallelization is disabled. If greater than 0, tasks with iterable arguments are automatically split into chunks.

  • retry_for (Optional[Tuple[Exception, ]]) – Exceptions for which the task should be retried.

  • max_retries (int | None) – The maximum number of retries for a task.

  • running_concurrency (ConcurrencyControlType | None) – Controls the concurrency behavior of the task.

  • registration_concurrency (ConcurrencyControlType | None) – Manages task registration concurrency.

  • key_arguments (Optional[Tuple[str, ]]) – Key arguments for concurrency control.

  • on_diff_non_key_args_raise (bool | None) – If True, raises an exception for task invocations with matching key arguments but different non-key arguments.

  • call_result_cache (bool | None) – If True, it will return the latest result of a Task with the same arguments if available, otherwise it will trigger a new invocation as expected.

  • disable_cache_args (tuple[str, ] | None) – Arguments to exclude from caching, it will accept “*” to disable caching for all arguments.

  • force_new_workflow (bool | None) – If True, this task will always create a new workflow when invoked. Even when called from within another workflow, it creates a subworkflow that maintains a reference to its parent workflow.

Returns:

A function that behaves like the original but is backed by a distributed task system.

Note:

A direct task do not have triggers, it is always executed when called.

Example:

@app.direct_task(max_retries=3)
def my_func(x, y):
    return x + y

# This will return the result directly
result = my_func(1, 2)  # Returns 3

# With parallel execution
@app.direct_task(
    parallel_func=lambda _: [(i, i+1) for i in range(5)],
    aggregate_func=sum
)
def add_parallel(x, y):
    return x + y

result = add_parallel(0, 0)  # Returns sum of all parallel results

# With optimized pre-serialization of large shared data
@app.direct_task(
    parallel_func=lambda args: {
        "common_args": {"large_data": args["large_data"]},
        "param_iter": [{"index": i} for i in range(100)]
    },
    aggregate_func=lambda results: sum(r[0] for r in results)
)
def process_data(large_data: str, index: int = 0) -> tuple[int, int]:
    # Process large data with multiple parallel tasks
    return (len(large_data) + index, index)

# Calling with 20MB of data
huge_data = "x" * (20 * 1024 * 1024)
result = process_data(huge_data)  # Pre-serializes huge_data only once