pynenc.task#

Module Contents#

Classes#

Task

A task in the Pynenc library that represents a function that can be distributed.

API#

class pynenc.task.Task(app: pynenc.app.Pynenc, func: pynenc.types.Func, options: dict[str, Any])[source]#

Bases: typing.Generic[pynenc.types.Params, pynenc.types.Result]

A task in the Pynenc library that represents a function that can be distributed.

Parameters:
  • app (Pynenc) – A reference to the Pynenc application.

  • func (Callable) – The function to be run distributed.

  • **options (dict[str, Any]) – The options to apply.

The BaseTask can be called normally and will return an instance of BaseResult. The result will be an AsyncResult when running normally but can be SyncResult when running eagerly in development with the pynenc app’s dev_mode_force_sync_tasks option set to True (or the ‘PYNENC_DEV_MODE_FORCE_SYNC_TASK’ environment variable set). The option dev_mode_force_sync_tasks should only be used in development.

Hint

Although it is possible to create a BaseTask instance directly, it is recommended to use the decorator provided in the pynenc application, i.e., @app.task(options...). This is the expected way of instantiating a class and registering it in the app.

Limitations

Attention

This implementation does not support the creation of tasks from functions defined in modules intended to run as standalone scripts.

This applies to any module executed directly, where its __name__ attribute becomes "__main__". This is not exclusive to modules with if __name__ == "__main__" sections but includes any module run as the main program. In such situations, func.__module__ being "__main__" poses a challenge for task instantiation and serialization. When a task is executed in the initiator script, it is identified as __main__.task_name.

However, in a Pynenc worker’s distributed environment, __main__ refers to the worker itself. As a result, the task identified as __main__.task_name cannot be found, since the worker’s __main__ differs from that of the initiator script. To ensure simplicity and robustness in task management, tasks defined in modules run as the main program are not supported.

Examples

@app.task(options)
def func():
    pass

result = func()

Raises

  • RuntimeError: If an attempt is made to create a task from a function in the __main__ module.

Initialization

validate_options() None[source]#

validate that all the option fields exists in the config_fields it will raise an exception with all the invalid options

conf() pynenc.conf.config_task.ConfigTask#
property invocation: pynenc.invocation.base_invocation.BaseInvocation#

The invocation of the task

to_json() str[source]#
Returns:

The serialized task

__getstate__() dict[source]#
__setstate__(state: dict) None[source]#
static _from_json(serialized: str) tuple[str, pynenc.types.Func, dict[str, Any]][source]#
Returns:

a function and options from a serialized task

classmethod from_json(app: pynenc.app.Pynenc, serialized: str) pynenc.task.Task[source]#
Returns:

a new task from a serialized task

retriable_exceptions() tuple[type[Exception], ...]#

Retrieve a tuple of exception types that should trigger a retry of the task.

This method provides a list of exception types, indicating which exceptions will cause the task to be retried. The RetryError exception type, specific to the Pynenc system, is always included to ensure that internal retry mechanisms are accounted for.

Returns:

A tuple of retriable exceptions.

__str__() str[source]#
__repr__() str[source]#
args(*args: pynenc.types.Params.args, **kwargs: pynenc.types.Params.kwargs) pynenc.arguments.Arguments[source]#
Returns:

an Arguments instance from the given args and kwargs

__call__(*args: pynenc.types.Params.args, **kwargs: pynenc.types.Params.kwargs) pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params, pynenc.types.Result][source]#

Handles a call to the task

_call(arguments: pynenc.arguments.Arguments) pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params, pynenc.types.Result][source]#

Route the call to the orchestrator if not in dev mode, otherwise run synchronously

Returns:

the invocation

parallelize(param_iter: Iterable[tuple | dict | pynenc.arguments.Arguments]) pynenc.invocation.base_invocation.BaseInvocationGroup[source]#

Parallelize the execution of a task with different sets of parameters.

This method allows for concurrent execution of the same task with varying parameters. It accepts an iterable where each element represents a set of parameters for a separate task invocation.

Note

These parameters can be specified in different formats:

  • As a tuple: Interpreted as positional arguments for the task.

  • As a dictionary: Interpreted as keyword arguments for the task.

  • As an Arguments instance: Created using task.args(*args, **kwargs).

Parameters:

param_iter (Iterable[tuple | dict | Arguments]) – An iterable of parameters for each call. Each element in the iterable is used to invoke the task separately.

Returns:

A group of task invocations, allowing the task to be run in parallel with different parameters. The type of group (synchronous or distributed) depends on the application’s configuration.

Important

Depending on the configuration, this method creates a group of either synchronous or distributed invocations. In development mode, where dev_mode_force_sync_tasks is enabled, it creates synchronous invocations. Otherwise, it creates distributed invocations for parallel processing.

Examples

app = Pynenc()

@app.task
def add(x: int, y: int) -> int:
    return x + y

# Example usage of parallelize
invocation_group = add.parallelize([(1, 1), add.args(1, 2), {"x": 2, "y": 3}])
print(list(invocation_group.results))
# prints [2, 3, 5]