Source code for pynenc.task

from __future__ import annotations

import importlib
import json
from functools import cached_property
from typing import TYPE_CHECKING, Any, Generic, Iterable

from pynenc import context
from pynenc.arguments import Arguments
from pynenc.call import Call
from pynenc.conf.config_task import ConfigTask
from pynenc.exceptions import InvalidTaskOptionsError, RetryError
from pynenc.invocation.base_invocation import BaseInvocation, BaseInvocationGroup
from pynenc.invocation.conc_invocation import (
    ConcurrentInvocation,
    ConcurrentInvocationGroup,
)
from pynenc.invocation.dist_invocation import DistributedInvocationGroup
from pynenc.types import Func, Params, Result
from pynenc.util.log import TaskLoggerAdapter

if TYPE_CHECKING:
    from pynenc.app import Pynenc


[docs] class Task(Generic[Params, Result]): """ **A task in the Pynenc library that represents a function that can be distributed.** :param Pynenc app: A reference to the Pynenc application. :param Callable func: The function to be run distributed. :param dict[str, Any] **options: The options to apply. The `BaseTask` can be called normally and will return an instance of `BaseResult`. The result will be an `AsyncResult` when running normally but can be `SyncResult` when running eagerly in development with the `pynenc` app's `dev_mode_force_sync_tasks` option set to `True` (or the 'PYNENC_DEV_MODE_FORCE_SYNC_TASK' environment variable set). The option `dev_mode_force_sync_tasks` should only be used in development. ```{hint} Although it is possible to create a `BaseTask` instance directly, it is recommended to use the decorator provided in the `pynenc` application, i.e., `@app.task(options...)`. This is the expected way of instantiating a class and registering it in the app. ``` ### Limitations ```{attention} This implementation does not support the creation of tasks from functions defined in modules intended to run as standalone scripts. ``` This applies to any module executed directly, where its `__name__` attribute becomes `"__main__"`. This is not exclusive to modules with `if __name__ == "__main__"` sections but includes any module run as the main program. In such situations, `func.__module__` being `"__main__"` poses a challenge for task instantiation and serialization. When a task is executed in the initiator script, it is identified as `__main__.task_name`. However, in a Pynenc worker's distributed environment, `__main__` refers to the worker itself. As a result, the task identified as `__main__.task_name` cannot be found, since the worker's `__main__` differs from that of the initiator script. **To ensure simplicity and robustness in task management, tasks defined in modules run as the main program are not supported.** ### Examples ```{code-block} python @app.task(options) def func(): pass result = func() ``` ### Raises - **RuntimeError:** If an attempt is made to create a task from a function in the `__main__` module. """ def __init__(self, app: Pynenc, func: Func, options: dict[str, Any]) -> None: if "__main__" in func.__module__: raise RuntimeError( "Cannot create a task from a function in the __main__ module" ) self.task_id = f"{func.__module__}.{func.__name__}" self.app = app self.logger = TaskLoggerAdapter(self.app.logger, self.task_id) self.func = func self.options = options self.validate_options()
[docs] def validate_options(self) -> None: """ validate that all the option fields exists in the config_fields it will raise an exception with all the invalid options """ invalid_options = [] for option in self.options: if option not in ConfigTask.config_fields(): invalid_options.append(option) if invalid_options: raise InvalidTaskOptionsError( self.task_id, f"Invalid options: {invalid_options}" )
@cached_property def conf(self) -> ConfigTask: return ConfigTask( task_id=self.task_id, config_values=self.app.config_values, config_filepath=self.app.config_filepath, task_options=self.options, ) @property def invocation(self) -> BaseInvocation: """The invocation of the task""" if dist_inv := context.get_dist_invocation_context(self.app.app_id): return dist_inv if sync_inv := context.sync_inv_context.get(self.app.app_id): return sync_inv raise RuntimeError("Task has not been invoked yet")
[docs] def to_json(self) -> str: """:return: The serialized task""" return json.dumps( {"task_id": self.task_id, "options": self.conf.options_to_json()} )
[docs] def __getstate__(self) -> dict: # Return state as a dictionary and a secondary value as a tuple return {"app": self.app, "task_json": self.to_json()}
[docs] def __setstate__(self, state: dict) -> None: # Restore instance attributes self.app = state["app"] serialized = state["task_json"] task_id, func, options = Task._from_json(serialized) # Restore the cached property self.task_id = task_id self.app = self.app self.func = func self.options = options self.logger = TaskLoggerAdapter(self.app.logger, self.task_id)
[docs] @staticmethod def _from_json(serialized: str) -> tuple[str, Func, dict[str, Any]]: """:return: a function and options from a serialized task""" task_dict = json.loads(serialized) task_id = task_dict["task_id"] module_name, function_name = task_id.rsplit(".", 1) module = importlib.import_module(module_name) function = getattr(module, function_name) options = ConfigTask.options_from_json(task_dict["options"]) return task_id, function.func, options
[docs] @classmethod def from_json(cls, app: Pynenc, serialized: str) -> Task: """:return: a new task from a serialized task""" _, func, options = cls._from_json(serialized) return cls(app, func, options)
@cached_property def retriable_exceptions(self) -> tuple[type[Exception], ...]: """ Retrieve a tuple of exception types that should trigger a retry of the task. This method provides a list of exception types, indicating which exceptions will cause the task to be retried. The `RetryError` exception type, specific to the Pynenc system, is always included to ensure that internal retry mechanisms are accounted for. :return: A tuple of retriable exceptions. """ if not self.conf.retry_for: return (RetryError,) if RetryError in self.conf.retry_for: return self.conf.retry_for return self.conf.retry_for + (RetryError,)
[docs] def __str__(self) -> str: return f"Task(func={self.func.__name__})"
[docs] def __repr__(self) -> str: return self.__str__()
[docs] def args(self, *args: Params.args, **kwargs: Params.kwargs) -> Arguments: """:return: an Arguments instance from the given args and kwargs""" return Arguments.from_call(self.func, *args, **kwargs)
[docs] def __call__( self, *args: Params.args, **kwargs: Params.kwargs ) -> BaseInvocation[Params, Result]: """Handles a call to the task""" arguments = Arguments.from_call(self.func, *args, **kwargs) return self._call(arguments)
[docs] def _call(self, arguments: Arguments) -> BaseInvocation[Params, Result]: """ Route the call to the orchestrator if not in dev mode, otherwise run synchronously :return: the invocation """ if self.app.conf.dev_mode_force_sync_tasks: return ConcurrentInvocation(call=Call(self, arguments)) return self.app.orchestrator.route_call(Call(self, arguments))
[docs] def parallelize( self, param_iter: Iterable[tuple | dict | Arguments] ) -> BaseInvocationGroup: """ Parallelize the execution of a task with different sets of parameters. This method allows for concurrent execution of the same task with varying parameters. It accepts an iterable where each element represents a set of parameters for a separate task invocation. ```{note} These parameters can be specified in different formats: - As a tuple: Interpreted as positional arguments for the task. - As a dictionary: Interpreted as keyword arguments for the task. - As an `Arguments` instance: Created using `task.args(*args, **kwargs)`. ``` :param Iterable[tuple | dict | Arguments] param_iter: An iterable of parameters for each call. Each element in the iterable is used to invoke the task separately. :return: A group of task invocations, allowing the task to be run in parallel with different parameters. The type of group (synchronous or distributed) depends on the application's configuration. ```{important} Depending on the configuration, this method creates a group of either synchronous or distributed invocations. In development mode, where `dev_mode_force_sync_tasks` is enabled, it creates synchronous invocations. Otherwise, it creates distributed invocations for parallel processing. ``` ### Examples ```{code-block} python app = Pynenc() @app.task def add(x: int, y: int) -> int: return x + y # Example usage of parallelize invocation_group = add.parallelize([(1, 1), add.args(1, 2), {"x": 2, "y": 3}]) print(list(invocation_group.results)) # prints [2, 3, 5] ``` """ self.logger.info(f"parallelizing {self.task_id}") group_cls: type[BaseInvocationGroup] if self.app.conf.dev_mode_force_sync_tasks: group_cls = ConcurrentInvocationGroup else: group_cls = DistributedInvocationGroup def get_args(params: tuple | dict | Arguments) -> Arguments: if isinstance(params, tuple): return self.args(*params) if isinstance(params, dict): return self.args(**params) return params invocations = [] for params in param_iter: if invocation := self._call(get_args(params)): invocations.append(invocation) self.logger.info( f"parallelized {self.task_id} with {len(invocations)} invocations" ) return group_cls(self, invocations)