pynenc.task¶
Module Contents¶
Classes¶
A task in the Pynenc library that represents a function that can be distributed. |
Functions¶
Determine if a task can be processed in batches. |
|
Convert various parameter formats to a list of Arguments objects, merging with common_args when provided. |
|
Distribute calls individually without batch processing. |
|
Process a list of parameters in batches using PreSerializedCall. Handles pre-serialization of common arguments for efficient distribution. |
API¶
- class pynenc.task.Task(app: pynenc.app.Pynenc, func: pynenc.types.Func, options: dict[str, Any])[source]¶
Bases:
typing.Generic[pynenc.types.Params,pynenc.types.Result]A task in the Pynenc library that represents a function that can be distributed.
- Parameters:
The
BaseTaskcan be called normally and will return an instance ofBaseResult. The result will be anAsyncResultwhen running normally but can beSyncResultwhen running eagerly in development with thepynencapp’sdev_mode_force_sync_tasksoption set toTrue(or the ‘PYNENC_DEV_MODE_FORCE_SYNC_TASK’ environment variable set). The optiondev_mode_force_sync_tasksshould only be used in development.Hint
Although it is possible to create a
BaseTaskinstance directly, it is recommended to use the decorator provided in thepynencapplication, i.e.,@app.task(options...). This is the expected way of instantiating a class and registering it in the app.Limitations
Attention
This implementation does not support the creation of tasks from functions defined in modules intended to run as standalone scripts.
This applies to any module executed directly, where its
__name__attribute becomes"__main__". This is not exclusive to modules withif __name__ == "__main__"sections but includes any module run as the main program. In such situations,func.__module__being"__main__"poses a challenge for task instantiation and serialization. When a task is executed in the initiator script, it is identified as__main__.task_name.However, in a Pynenc worker’s distributed environment,
__main__refers to the worker itself. As a result, the task identified as__main__.task_namecannot be found, since the worker’s__main__differs from that of the initiator script. To ensure simplicity and robustness in task management, tasks defined in modules run as the main program are not supported.Examples
@app.task(options) def func(): pass result = func()
Raises
RuntimeError: If an attempt is made to create a task from a function in the
__main__module.
Initialization
- validate_options() None[source]¶
validate that all the option fields exists in the config_fields it will raise an exception with all the invalid options
- property invocation: pynenc.invocation.base_invocation.BaseInvocation¶
The invocation of the task
- static _get_from_task_id(task_id: str) pynenc.task.Task | Callable[source]¶
- static _from_json(serialized: str) tuple[str, pynenc.types.Func, dict[str, Any]][source]¶
- Returns:
a function and options from a serialized task
- classmethod from_json(app: pynenc.app.Pynenc, serialized: str) pynenc.task.Task[source]¶
- Returns:
a new task from a serialized task
- classmethod from_id(app: pynenc.app.Pynenc, task_id: str) pynenc.task.Task[source]¶
- Returns:
a new task from a task ID
- retriable_exceptions() tuple[type[Exception], ...]¶
Retrieve a tuple of exception types that should trigger a retry of the task.
This method provides a list of exception types, indicating which exceptions will cause the task to be retried. The
RetryErrorexception type, specific to the Pynenc system, is always included to ensure that internal retry mechanisms are accounted for.- Returns:
A tuple of retriable exceptions.
- args(*args: pynenc.types.Params.args, **kwargs: pynenc.types.Params.kwargs) pynenc.arguments.Arguments[source]¶
- Returns:
an Arguments instance from the given args and kwargs
- __call__(*args: pynenc.types.Params.args, **kwargs: pynenc.types.Params.kwargs) pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params, pynenc.types.Result][source]¶
Handles a call to the task
- _call(arguments: pynenc.arguments.Arguments) pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params, pynenc.types.Result][source]¶
Route the call to the orchestrator if not in dev mode, otherwise run synchronously
- Returns:
the invocation
- parallelize(param_iter: collections.abc.Iterable[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) pynenc.invocation.base_invocation.BaseInvocationGroup[source]¶
Parallelize the execution of a task with different sets of parameters.
This method allows for concurrent execution of the same task with varying parameters. It accepts an iterable where each element represents a set of parameters for a separate task invocation. When
common_argsis provided,param_itermust be an iterable of dictionaries, and common arguments are pre-serialized for efficiency.Note
Without common_args param_iter can be specified in different formats:
As a tuple: Interpreted as positional arguments for the task.
As a dictionary: Interpreted as keyword arguments for the task.
As an
Argumentsinstance: Created usingtask.args(*args, **kwargs).
Important
common_args is intended for optimize parallelization of huge arguments that will be cached by the arg_cache. if the arguments are small or the arg_cache is disabled, it will not provide any major improvement. However, for big arguments, it will provide massive time and memory improvements.
- Parameters:
- Returns:
A group of task invocations, allowing the task to be run in parallel with different parameters. The type of group (synchronous or distributed) depends on the application’s configuration.
Important
Depending on the configuration, this method creates a group of either synchronous or distributed invocations. In development mode, where
dev_mode_force_sync_tasksis enabled, it creates synchronous invocations. Otherwise, it creates distributed invocations for parallel processing.Examples
Parallelization with tuples, dicts and arguments:
app = Pynenc() @app.task def add(x: int, y: int) -> int: return x + y # Example usage of parallelize invocation_group = add.parallelize([(1, 1), add.args(1, 2), {"x": 2, "y": 3}]) print(list(invocation_group.results)) # prints [2, 3, 5]
Parallelization with common_args:
@app.task(registration_concurrency=ConcurrencyControlType.DISABLED) def process(large_data: str, index: int) -> int: return len(large_data) + index # With common_args common = {"large_data": "huge_string"} params = [{"index": i} for i in range(3)] invocation_group = process.parallelize(params, common) print(list(invocation_group.results)) # [len("huge_string") + i for i in range(3)]
- pynenc.task.can_batch_process(task: pynenc.task.Task, num_calls: int) bool[source]¶
Determine if a task can be processed in batches.
A task can be batch processed when:
The application is not in development mode
Registration concurrency is disabled
There are multiple calls to process
The task has a valid parallel_batch_size
- pynenc.task.prepare_arguments(task: pynenc.task.Task, param_iter: collections.abc.Iterable[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) list[pynenc.arguments.Arguments][source]¶
Convert various parameter formats to a list of Arguments objects, merging with common_args when provided.
- pynenc.task.distribute_calls(task: pynenc.task.Task, param_list: list[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) pynenc.invocation.base_invocation.BaseInvocationGroup[source]¶
Distribute calls individually without batch processing.
- pynenc.task.distribute_batch_calls(task: pynenc.task.Task[pynenc.types.Params, pynenc.types.Result], param_list: list[tuple | dict | pynenc.arguments.Arguments], common_args: dict | None = None) pynenc.invocation.dist_invocation.DistributedInvocationGroup[source]¶
Process a list of parameters in batches using PreSerializedCall. Handles pre-serialization of common arguments for efficient distribution.