pynenc.task¶
Module Contents¶
Classes¶
Represents a distributable function in the Pynenc system. |
Functions¶
Extract a Task from a wrapper by scanning its instance attributes. |
|
Determine if a task can be processed in batches. |
|
Convert various parameter formats to a list of Arguments objects, merging with common_args when provided. |
|
Distribute calls individually without batch processing. |
|
Process a list of parameters in batches using PreSerializedCall. Handles pre-serialization of common arguments for efficient distribution. |
API¶
- class pynenc.task.Task(app: pynenc.app.Pynenc, func: pynenc.types.Func, options: dict[str, Any])[source]¶
Bases:
typing.Generic[pynenc.types.Params,pynenc.types.Result]Represents a distributable function in the Pynenc system.
- Parameters:
Calling the task returns a
BaseInvocationwhose concrete type depends on the execution environment (e.g.DistributedInvocationorConcurrentInvocation).Tasks cannot be created from functions defined in
__main__because workers cannot resolve__main__.task_nameback to the originating module. Use the@app.taskdecorator to create and register tasks.- Raises:
RuntimeError – If the function is defined in the
__main__module.
Initialization
- validate_options() None[source]¶
validate that all the option fields exists in the config_fields it will raise an exception with all the invalid options
- property logger: logging.Logger¶
The logger for the task
- property invocation: pynenc.invocation.base_invocation.BaseInvocation¶
The invocation of the task
- wf() pynenc.workflow.workflow_context.WorkflowContext¶
Access workflow functionality for this task.
Provides methods for workflow state management, deterministic operations, and durability features like pause/resume and continue-as-new.
- Returns:
A helper object with workflow functionality
Example:
@app.task def main_wf_task(data: dict) -> str: # Save workflow state state = main_wf_task.wf.get_state({"step": 0}) # Use deterministic random if main_wf_task.wf.random() > 0.5: state["path"] = "A" else: state["path"] = "B" # Save updated state main_wf_task.wf.save_state(state) # Conditionally pause workflow if needs_human_approval(data): main_wf_task.wf.pause("Waiting for approval") return f"Completed via path {state['path']}"
- static _get_from_task_id(task_id: pynenc.identifiers.task_id.TaskId) pynenc.task.Task | collections.abc.Callable[source]¶
- classmethod from_id(app: pynenc.app.Pynenc, task_id: pynenc.identifiers.task_id.TaskId) pynenc.task.Task[source]¶
Resolve a Task instance by its TaskId, bound to the given app.
Checks the app’s registered tasks first, then falls back to importing the module. When a module-level Task is found, a new Task instance is created and bound to
appso the caller never shares the module-level app reference. This prevents cross-contamination when multiple apps with different runners coexist in the same process.- Parameters:
- Returns:
The resolved Task instance bound to
app- Raises:
ValueError – If the task_id cannot be resolved to a Task
- retriable_exceptions() tuple[type[Exception], ...]¶
Retrieve a tuple of exception types that should trigger a retry of the task.
This method provides a list of exception types, indicating which exceptions will cause the task to be retried. The
RetryErrorexception type, specific to the Pynenc system, is always included to ensure that internal retry mechanisms are accounted for.- Returns:
A tuple of retriable exceptions.
- args(*args: pynenc.types.Params.args, **kwargs: pynenc.types.Params.kwargs) pynenc.arguments.Arguments[source]¶
- Returns:
an Arguments instance from the given args and kwargs
- __call__(*args: pynenc.types.Params.args, **kwargs: pynenc.types.Params.kwargs) pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params, pynenc.types.Result][source]¶
Handles a call to the task
- _call(arguments: pynenc.arguments.Arguments) pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params, pynenc.types.Result][source]¶
Route the call to the orchestrator if not in dev mode, otherwise run synchronously
- Returns:
the invocation
- parallelize(param_iter: collections.abc.Iterable[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) pynenc.invocation.base_invocation.BaseInvocationGroup[source]¶
Parallelize the execution of a task with different sets of parameters.
This method allows for concurrent execution of the same task with varying parameters. It accepts an iterable where each element represents a set of parameters for a separate task invocation. When
common_argsis provided,param_itermust be an iterable of dictionaries, and common arguments are pre-serialized for efficiency.Note
Without common_args param_iter can be specified in different formats:
As a tuple: Interpreted as positional arguments for the task.
As a dictionary: Interpreted as keyword arguments for the task.
As an
Argumentsinstance: Created usingtask.args(*args, **kwargs).
Important
common_args is intended for optimize parallelization of huge arguments that will be cached by the client data store. if the arguments are small or the client data store is disabled, it will not provide any major improvement. However, for big arguments, it will provide massive time and memory improvements.
- Parameters:
- Returns:
A group of task invocations, allowing the task to be run in parallel with different parameters. The type of group (synchronous or distributed) depends on the application’s configuration.
Important
Depending on the configuration, this method creates a group of either synchronous or distributed invocations. In development mode, where
dev_mode_force_sync_tasksis enabled, it creates synchronous invocations. Otherwise, it creates distributed invocations for parallel processing.Examples
Parallelization with tuples, dicts and arguments:
app = Pynenc() @app.task def add(x: int, y: int) -> int: return x + y # Example usage of parallelize invocation_group = add.parallelize([(1, 1), add.args(1, 2), {"x": 2, "y": 3}]) print(list(invocation_group.results)) # prints [2, 3, 5]
Parallelization with common_args:
@app.task(registration_concurrency=ConcurrencyControlType.DISABLED) def process(large_data: str, index: int) -> int: return len(large_data) + index # With common_args common = {"large_data": "huge_string"} params = [{"index": i} for i in range(3)] invocation_group = process.parallelize(params, common) print(list(invocation_group.results)) # [len("huge_string") + i for i in range(3)]
- pynenc.task._extract_task_from_wrapper(wrapper: object) pynenc.task.Task | None[source]¶
Extract a Task from a wrapper by scanning its instance attributes.
Handles any wrapper pattern (direct_task’s
__pynenc_task__, Celery migration shims withpynenc_task, etc.) without hardcoding attribute names.- Parameters:
wrapper (object) – The object to inspect
- Returns:
The first Task instance found, or None
- pynenc.task.can_batch_process(task: pynenc.task.Task, num_calls: int) bool[source]¶
Determine if a task can be processed in batches.
A task can be batch processed when:
The application is not in development mode
Registration concurrency is disabled
There are multiple calls to process
The task has a valid parallel_batch_size
- pynenc.task.prepare_arguments(task: pynenc.task.Task, param_iter: collections.abc.Iterable[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) list[pynenc.arguments.Arguments][source]¶
Convert various parameter formats to a list of Arguments objects, merging with common_args when provided.
- pynenc.task.distribute_calls(task: pynenc.task.Task, param_list: list[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) pynenc.invocation.base_invocation.BaseInvocationGroup[source]¶
Distribute calls individually without batch processing.
- pynenc.task.distribute_batch_calls(task: pynenc.task.Task[pynenc.types.Params, pynenc.types.Result], param_list: list[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) pynenc.invocation.dist_invocation.DistributedInvocationGroup[source]¶
Process a list of parameters in batches using PreSerializedCall. Handles pre-serialization of common arguments for efficient distribution.