pynenc.task#
Module Contents#
Classes#
A task in the Pynenc library that represents a function that can be distributed. |
API#
- class pynenc.task.Task(app: pynenc.app.Pynenc, func: pynenc.types.Func, options: dict[str, Any])[source]#
Bases:
typing.Generic[pynenc.types.Params,pynenc.types.Result]A task in the Pynenc library that represents a function that can be distributed.
- Parameters:
The
BaseTaskcan be called normally and will return an instance ofBaseResult. The result will be anAsyncResultwhen running normally but can beSyncResultwhen running eagerly in development with thepynencapp’sdev_mode_force_sync_tasksoption set toTrue(or the ‘PYNENC_DEV_MODE_FORCE_SYNC_TASK’ environment variable set). The optiondev_mode_force_sync_tasksshould only be used in development.Hint
Although it is possible to create a
BaseTaskinstance directly, it is recommended to use the decorator provided in thepynencapplication, i.e.,@app.task(options...). This is the expected way of instantiating a class and registering it in the app.Limitations
Attention
This implementation does not support the creation of tasks from functions defined in modules intended to run as standalone scripts.
This applies to any module executed directly, where its
__name__attribute becomes"__main__". This is not exclusive to modules withif __name__ == "__main__"sections but includes any module run as the main program. In such situations,func.__module__being"__main__"poses a challenge for task instantiation and serialization. When a task is executed in the initiator script, it is identified as__main__.task_name.However, in a Pynenc worker’s distributed environment,
__main__refers to the worker itself. As a result, the task identified as__main__.task_namecannot be found, since the worker’s__main__differs from that of the initiator script. To ensure simplicity and robustness in task management, tasks defined in modules run as the main program are not supported.Examples
@app.task(options) def func(): pass result = func()
Raises
RuntimeError: If an attempt is made to create a task from a function in the
__main__module.
Initialization
- validate_options() None[source]#
validate that all the option fields exists in the config_fields it will raise an exception with all the invalid options
- property invocation: pynenc.invocation.base_invocation.BaseInvocation#
The invocation of the task
- static _from_json(serialized: str) tuple[str, pynenc.types.Func, dict[str, Any]][source]#
- Returns:
a function and options from a serialized task
- classmethod from_json(app: pynenc.app.Pynenc, serialized: str) pynenc.task.Task[source]#
- Returns:
a new task from a serialized task
- retriable_exceptions() tuple[type[Exception], ...]#
Retrieve a tuple of exception types that should trigger a retry of the task.
This method provides a list of exception types, indicating which exceptions will cause the task to be retried. The
RetryErrorexception type, specific to the Pynenc system, is always included to ensure that internal retry mechanisms are accounted for.- Returns:
A tuple of retriable exceptions.
- args(*args: pynenc.types.Params.args, **kwargs: pynenc.types.Params.kwargs) pynenc.arguments.Arguments[source]#
- Returns:
an Arguments instance from the given args and kwargs
- __call__(*args: pynenc.types.Params.args, **kwargs: pynenc.types.Params.kwargs) pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params, pynenc.types.Result][source]#
Handles a call to the task
- _call(arguments: pynenc.arguments.Arguments) pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params, pynenc.types.Result][source]#
Route the call to the orchestrator if not in dev mode, otherwise run synchronously
- Returns:
the invocation
- parallelize(param_iter: Iterable[tuple | dict | pynenc.arguments.Arguments]) pynenc.invocation.base_invocation.BaseInvocationGroup[source]#
Parallelize the execution of a task with different sets of parameters.
This method allows for concurrent execution of the same task with varying parameters. It accepts an iterable where each element represents a set of parameters for a separate task invocation.
Note
These parameters can be specified in different formats:
As a tuple: Interpreted as positional arguments for the task.
As a dictionary: Interpreted as keyword arguments for the task.
As an
Argumentsinstance: Created usingtask.args(*args, **kwargs).
- Parameters:
param_iter (Iterable[tuple | dict | Arguments]) – An iterable of parameters for each call. Each element in the iterable is used to invoke the task separately.
- Returns:
A group of task invocations, allowing the task to be run in parallel with different parameters. The type of group (synchronous or distributed) depends on the application’s configuration.
Important
Depending on the configuration, this method creates a group of either synchronous or distributed invocations. In development mode, where
dev_mode_force_sync_tasksis enabled, it creates synchronous invocations. Otherwise, it creates distributed invocations for parallel processing.Examples
app = Pynenc() @app.task def add(x: int, y: int) -> int: return x + y # Example usage of parallelize invocation_group = add.parallelize([(1, 1), add.args(1, 2), {"x": 2, "y": 3}]) print(list(invocation_group.results)) # prints [2, 3, 5]