pynenc.task

Module Contents

Classes

Task

A task in the Pynenc library that represents a function that can be distributed.

Functions

_extract_task_from_wrapper

Extract a Task from a wrapper by scanning its instance attributes.

can_batch_process

Determine if a task can be processed in batches.

prepare_arguments

Convert various parameter formats to a list of Arguments objects, merging with common_args when provided.

distribute_calls

Distribute calls individually without batch processing.

distribute_batch_calls

Process a list of parameters in batches using PreSerializedCall. Handles pre-serialization of common arguments for efficient distribution.

API

class pynenc.task.Task(app: pynenc.app.Pynenc, func: pynenc.types.Func, options: dict[str, Any])[source]

Bases: typing.Generic[pynenc.types.Params, pynenc.types.Result]

A task in the Pynenc library that represents a function that can be distributed.

Parameters:
  • app (Pynenc) – A reference to the Pynenc application.

  • func (Callable) – The function to be run distributed.

  • **options (dict[str, Any]) – The options to apply.

The BaseTask can be called normally and will return an instance of BaseResult. The result will be an AsyncResult when running normally but can be SyncResult when running eagerly in development with the pynenc app’s dev_mode_force_sync_tasks option set to True (or the ‘PYNENC_DEV_MODE_FORCE_SYNC_TASK’ environment variable set). The option dev_mode_force_sync_tasks should only be used in development.

Hint

Although it is possible to create a BaseTask instance directly, it is recommended to use the decorator provided in the pynenc application, i.e., @app.task(options...). This is the expected way of instantiating a class and registering it in the app.

Limitations

Attention

This implementation does not support the creation of tasks from functions defined in modules intended to run as standalone scripts.

This applies to any module executed directly, where its __name__ attribute becomes "__main__". This is not exclusive to modules with if __name__ == "__main__" sections but includes any module run as the main program. In such situations, func.__module__ being "__main__" poses a challenge for task instantiation and serialization. When a task is executed in the initiator script, it is identified as __main__.task_name.

However, in a Pynenc worker’s distributed environment, __main__ refers to the worker itself. As a result, the task identified as __main__.task_name cannot be found, since the worker’s __main__ differs from that of the initiator script. To ensure simplicity and robustness in task management, tasks defined in modules run as the main program are not supported.

Examples

@app.task(options)
def func():
    pass

result = func()

Raises

  • RuntimeError: If an attempt is made to create a task from a function in the __main__ module.

Initialization

validate_options() None[source]

validate that all the option fields exists in the config_fields it will raise an exception with all the invalid options

conf() pynenc.conf.config_task.ConfigTask
property logger: logging.Logger

The logger for the task

property invocation: pynenc.invocation.base_invocation.BaseInvocation

The invocation of the task

wf() pynenc.workflow.workflow_context.WorkflowContext

Access workflow functionality for this task.

Provides methods for workflow state management, deterministic operations, and durability features like pause/resume and continue-as-new.

Returns:

A helper object with workflow functionality

Example:

@app.task
def main_wf_task(data: dict) -> str:
    # Save workflow state
    state = main_wf_task.wf.get_state({"step": 0})

    # Use deterministic random
    if main_wf_task.wf.random() > 0.5:
        state["path"] = "A"
    else:
        state["path"] = "B"

    # Save updated state
    main_wf_task.wf.save_state(state)

    # Conditionally pause workflow
    if needs_human_approval(data):
        main_wf_task.wf.pause("Waiting for approval")

    return f"Completed via path {state['path']}"
__getstate__() dict[source]
__setstate__(state: dict) None[source]
static _get_from_task_id(task_id: pynenc.identifiers.task_id.TaskId) pynenc.task.Task | collections.abc.Callable[source]
classmethod from_id(app: pynenc.app.Pynenc, task_id: pynenc.identifiers.task_id.TaskId) pynenc.task.Task[source]

Resolve a Task instance by its TaskId, bound to the given app.

Checks the app’s registered tasks first, then falls back to importing the module. When a module-level Task is found, a new Task instance is created and bound to app so the caller never shares the module-level app reference. This prevents cross-contamination when multiple apps with different runners coexist in the same process.

Parameters:
  • app (Pynenc) – The application instance with registered tasks

  • task_id (TaskId) – The task identifier to resolve

Returns:

The resolved Task instance bound to app

Raises:

ValueError – If the task_id cannot be resolved to a Task

retriable_exceptions() tuple[type[Exception], ...]

Retrieve a tuple of exception types that should trigger a retry of the task.

This method provides a list of exception types, indicating which exceptions will cause the task to be retried. The RetryError exception type, specific to the Pynenc system, is always included to ensure that internal retry mechanisms are accounted for.

Returns:

A tuple of retriable exceptions.

__str__() str[source]
__repr__() str[source]
args(*args: pynenc.types.Params.args, **kwargs: pynenc.types.Params.kwargs) pynenc.arguments.Arguments[source]
Returns:

an Arguments instance from the given args and kwargs

__call__(*args: pynenc.types.Params.args, **kwargs: pynenc.types.Params.kwargs) pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params, pynenc.types.Result][source]

Handles a call to the task

_call(arguments: pynenc.arguments.Arguments) pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params, pynenc.types.Result][source]

Route the call to the orchestrator if not in dev mode, otherwise run synchronously

Returns:

the invocation

parallelize(param_iter: collections.abc.Iterable[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) pynenc.invocation.base_invocation.BaseInvocationGroup[source]

Parallelize the execution of a task with different sets of parameters.

This method allows for concurrent execution of the same task with varying parameters. It accepts an iterable where each element represents a set of parameters for a separate task invocation. When common_args is provided, param_iter must be an iterable of dictionaries, and common arguments are pre-serialized for efficiency.

Note

Without common_args param_iter can be specified in different formats:

  • As a tuple: Interpreted as positional arguments for the task.

  • As a dictionary: Interpreted as keyword arguments for the task.

  • As an Arguments instance: Created using task.args(*args, **kwargs).

Important

common_args is intended for optimize parallelization of huge arguments that will be cached by the client data store. if the arguments are small or the client data store is disabled, it will not provide any major improvement. However, for big arguments, it will provide massive time and memory improvements.

Parameters:
  • param_iter (Iterable[tuple | dict | Arguments]) – An iterable of parameters for each call. Each element in the iterable is used to invoke the task separately.

  • common_args – Optional dictionary of common arguments to pre-serialize and share across calls.

Returns:

A group of task invocations, allowing the task to be run in parallel with different parameters. The type of group (synchronous or distributed) depends on the application’s configuration.

Important

Depending on the configuration, this method creates a group of either synchronous or distributed invocations. In development mode, where dev_mode_force_sync_tasks is enabled, it creates synchronous invocations. Otherwise, it creates distributed invocations for parallel processing.

Examples

Parallelization with tuples, dicts and arguments:

app = Pynenc()

@app.task
def add(x: int, y: int) -> int:
    return x + y

# Example usage of parallelize
invocation_group = add.parallelize([(1, 1), add.args(1, 2), {"x": 2, "y": 3}])
print(list(invocation_group.results))
# prints [2, 3, 5]

Parallelization with common_args:

@app.task(registration_concurrency=ConcurrencyControlType.DISABLED)
def process(large_data: str, index: int) -> int:
    return len(large_data) + index

# With common_args
common = {"large_data": "huge_string"}
params = [{"index": i} for i in range(3)]
invocation_group = process.parallelize(params, common)
print(list(invocation_group.results))  # [len("huge_string") + i for i in range(3)]
pynenc.task._extract_task_from_wrapper(wrapper: object) pynenc.task.Task | None[source]

Extract a Task from a wrapper by scanning its instance attributes.

Handles any wrapper pattern (direct_task’s __pynenc_task__, Celery migration shims with pynenc_task, etc.) without hardcoding attribute names.

Parameters:

wrapper (object) – The object to inspect

Returns:

The first Task instance found, or None

pynenc.task.can_batch_process(task: pynenc.task.Task, num_calls: int) bool[source]

Determine if a task can be processed in batches.

A task can be batch processed when:

  1. The application is not in development mode

  2. Registration concurrency is disabled

  3. There are multiple calls to process

  4. The task has a valid parallel_batch_size

Parameters:
  • task (Task) – The task to check

  • num_calls (int) – The number of calls to process

Returns:

True if the task can be batch processed, False otherwise

pynenc.task.prepare_arguments(task: pynenc.task.Task, param_iter: collections.abc.Iterable[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) list[pynenc.arguments.Arguments][source]

Convert various parameter formats to a list of Arguments objects, merging with common_args when provided.

Parameters:
  • task (Task) – The task to prepare arguments for

  • param_iter (Iterable[tuple | dict | Arguments]) – Iterable of parameters

  • common_args (dict | None) – Optional common arguments to merge with each parameter

Returns:

A list of Arguments objects with common_args merged in

pynenc.task.distribute_calls(task: pynenc.task.Task, param_list: list[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) pynenc.invocation.base_invocation.BaseInvocationGroup[source]

Distribute calls individually without batch processing.

Parameters:
  • task (Task) – The task to process

  • param_list (list[tuple | dict | Arguments]) – List of parameters

  • common_args (dict | None) – Optional common arguments to merge with each parameter

Returns:

A list of created invocations

pynenc.task.distribute_batch_calls(task: pynenc.task.Task[pynenc.types.Params, pynenc.types.Result], param_list: list[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) pynenc.invocation.dist_invocation.DistributedInvocationGroup[source]

Process a list of parameters in batches using PreSerializedCall. Handles pre-serialization of common arguments for efficient distribution.

Parameters:
  • task (Task) – The task to process

  • param_list (list[tuple | dict | Arguments]) – The arguments to process

  • common_args (dict | None) – Optional common arguments to be pre-serialized once

Returns:

An invocation group for the distributed calls