pynenc.app¶
Module Contents¶
Classes¶
The main class of the Pynenc library that creates an application object. |
API¶
- class pynenc.app.Pynenc(app_id: str | None = None, config_values: Optional[dict[str, Any]] = None, config_filepath: Optional[str] = None)[source]¶
The main class of the Pynenc library that creates an application object.
- Parameters:
Note
All of these base classes are abstract and cannot be used directly. If none is specified, they will default to
MemTaskBroker,MemStateBackend, etc. These default classes do not actually distribute the code but are helpers for tests or for running an application on your localhost. They may help to parallelize to some degree but cannot be used in a production system.Initialization
- logger() logging.Logger¶
- orchestrator() pynenc.orchestrator.base_orchestrator.BaseOrchestrator¶
- state_backend() pynenc.state_backend.base_state_backend.BaseStateBackend¶
- serializer() pynenc.serializer.base_serializer.BaseSerializer¶
- arg_cache() pynenc.arg_cache.base_arg_cache.BaseArgCache¶
- property runner: pynenc.runner.base_runner.BaseRunner¶
Get the runner for this app, prioritizing thread/process-specific context.
First, it checks the thread-local context for a runner (via get_current_runner). This is crucial in the MultiThreadRunner, where each process runs a ThreadRunner and needs to use its own runner instance rather than the app’s default.
If no context runner exists, it falls back to the instance-level runner. This mechanism ensures correct runner isolation across threads and processes.
- Returns:
The runner instance for the current context or the app instance.
- task(func: Optional[pynenc.types.Func] = None, *, auto_parallel_batch_size: Optional[int] = None, retry_for: Optional[tuple[type[Exception], ...]] = None, max_retries: Optional[int] = None, running_concurrency: Optional[pynenc.conf.config_task.ConcurrencyControlType] = None, registration_concurrency: Optional[pynenc.conf.config_task.ConcurrencyControlType] = None, key_arguments: Optional[tuple[str, ...]] = None, on_diff_non_key_args_raise: Optional[bool] = None, call_result_cache: Optional[bool] = None, disable_cache_args: Optional[tuple[str, ...]] = None) pynenc.task.Task | Callable[[pynenc.types.Func], pynenc.task.Task][source]¶
The task decorator converts the function into an instance of a BaseTask. It accepts any kind of options, however these options will be validated with the options class assigned to the class.
- Parameters:
func (Optional[Callable]) – The function to be converted into a Task instance.
auto_parallel_batch_size (Optional[int]) – If set to 0, auto parallelization is disabled. If greater than 0, tasks with iterable arguments are automatically split into chunks.
retry_for (Optional[Tuple[Exception, …]]) – Exceptions for which the task should be retried.
max_retries (Optional[int]) – The maximum number of retries for a task.
running_concurrency (Optional[ConcurrencyControlType]) – Controls the concurrency behavior of the task.
registration_concurrency (Optional[ConcurrencyControlType]) – Manages task registration concurrency.
key_arguments (Optional[Tuple[str, …]]) – Key arguments for concurrency control.
on_diff_non_key_args_raise (Optional[bool]) – If True, raises an exception for task invocations with matching key arguments but different non-key arguments.
call_result_cache (Optional[bool]) – If True, it will return the latest result of a Task with the same arguments if availble, otherwise it will trigger a new invocation as expected.
disable_cache_args (Optional[tuple[str, …]]) – Arguments to exclude from caching, it will accept “*” to disable caching for all arguments.
- Returns:
A Task instance or a callable that when called returns a Task instance.
- Example:
@app.task(auto_parallel_batch_size=10, max_retries=3) def my_func(x, y): return x + y