API Documentation#

This section contains autogenerated documentation from the codebase of Pynenc, detailing the API and its usage.

Main Modules#

App Module#

class pynenc.app.Pynenc(app_id: str | None = None, config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#

Bases: object

The main class of the Pynenc library that creates an application object.

Parameters#

task_brokerBaseTaskBroker

Handles routing of tasks for distributed execution.

state_backendBaseStateBackend

Maintains the state of tasks, runners, and other relevant system states.

orchestratorBaseOrchestrator

Coordinates all components and acts according to the configuration.

reportinglist of BaseReporting

Reports to one or more systems.

Notes#

All of these base classes are abstract and cannot be used directly. If none is specified, they will default to MemTaskBroker, MemStateBackend, etc. These default classes do not actually distribute the code but are helpers for tests or for running an application on your localhost. They may help to parallelize to some degree but cannot be used in a production system.

Examples#

Default Pynenc application for running in memory in a local environment.

>>> app = Pynenc()
property app_id: str#
property broker: BaseBroker#
property conf: ConfigPynenc#
is_initialized(property_name: str) bool#

Returns True if the given cached_property has been initialized

property logger: Logger#
property orchestrator: BaseOrchestrator#
purge() None#

Purge all data from the broker and state backend

property runner: BaseRunner#
property serializer: BaseSerializer#
property state_backend: BaseStateBackend#
task(func: Func, **options: Any) Task#
task(func: None = None, **options: Any) Callable[['Func'], 'Task']

The task decorator converts the function into an instance of a BaseTask. It accepts any kind of options, however these options will be validated with the options class assigned to the class.

Check the options reference in conf.config_task or the Pynenc documentation for a detailed explanation of the BaseTask instance you are applying.

Parameters#
funcCallable, optional

The function to be converted into a BaseTask instance.

**optionsdict

The options to be passed to the BaseTask instance.

Returns#
Task | Callable[…, Task]

The BaseTask instance or a callable that returns a BaseTask instance.

Examples#
>>> @app.task(option1='value1', option2='value2')
... def my_func(x, y):
...     return x + y
...
>>> result = my_func(1, 2)

Arguments Module#

class pynenc.arguments.Arguments(kwargs: Args | None = None)#

Bases: object

property args_id: str#

Generate a unique id for these arguments

classmethod from_call(func: Func, *args: Any, **kwargs: Any) Arguments#

Call Module#

class pynenc.call.Call(task: Task[Params, Result], arguments: Arguments = <factory>)#

Bases: Generic[Params, Result]

A specific call of a task

A call is unique per task and arguments

property app: Pynenc#
arguments: Arguments#
property call_id: str#

Returns a unique id for each task and arguments

deserialize_arguments(serialized_arguments: dict[str, str]) Arguments#

Returns an Arguments instance with the deserialized arguments

classmethod from_json(app: Pynenc, serialized: str) Call#

Returns a new call from a serialized call

property serialized_args_for_concurrency_check: dict[str, str] | None#

Returns a dictionary with the call arguments required for the task concurrency check

property serialized_arguments: dict[str, str]#

Returns a dictionary with each of the call arguments serialized into a string

task: Task[Params, Result]#
to_json() str#

Returns a string with the serialized call

Context Module#

Exceptions Module#

Global Pynenc exception and warning classes.

exception pynenc.exceptions.AlreadyInitializedError#

Bases: PynencError

Error raised when trying to change the class of a component after it was initialized

exception pynenc.exceptions.ConfigError#

Bases: PynencError

Base class for all the config related errors

exception pynenc.exceptions.ConfigMultiInheritanceError#

Bases: ConfigError

Error related with multiinheritance of config fields

exception pynenc.exceptions.CycleDetectedError(call_ids: list[str], message: str)#

Bases: PynencError

Raised when a cycle is detected in the DependencyGraph

classmethod from_cycle(cycle: list[Call]) CycleDetectedError#
exception pynenc.exceptions.InvalidTaskOptionsError(task_id: str, message: str | None = None)#

Bases: TaskError

Error raised when the task options are invalid.

exception pynenc.exceptions.InvocationConcurrencyWithDifferentArgumentsError(task_id: str, existing_invocation_id: str, new_call_id: str, diff: str, message: str | None = None)#

Bases: TaskRoutingError

Error raised when there is a pending task with different arguments than the current task.

static format_difference(existing_call: Call, new_call: Call) str#
classmethod from_call_mismatch(existing_invocation: BaseInvocation, new_call: Call, message: str | None = None) InvocationConcurrencyWithDifferentArgumentsError#
exception pynenc.exceptions.InvocationError(invocation_id: str, message: str | None = None)#

Bases: PynencError

Base class for all Task related errors.

exception pynenc.exceptions.InvocationNotFoundError(invocation_id: str, message: str | None = None)#

Bases: StateBackendError

Error raised when the invocation is not present in the State Backend.

exception pynenc.exceptions.PendingInvocationLockError(invocation_id: str)#

Bases: PynencError

Error raised when two processes try to set the same invocation as pending concurrently

exception pynenc.exceptions.PynencError#

Bases: Exception

Base class for all Pynenc related errors.

classmethod from_json(error_name: str, serialized: str) PynencError#

Returns the child class from a serialized error

to_json() str#

Returns a string with the serialized error

exception pynenc.exceptions.RetryError#

Bases: PynencError

Error raised when a task should be retried.

exception pynenc.exceptions.RunnerError#

Bases: PynencError

Base class for all Runner related errors.

exception pynenc.exceptions.RunnerNotExecutableError#

Bases: PynencError

Raised when trying to execute a runner that is not meant to be executed.

exception pynenc.exceptions.StateBackendError#

Bases: PynencError

Error raised when a task will not be routed.

exception pynenc.exceptions.TaskError(task_id: str, message: str | None = None)#

Bases: PynencError

Base class for all Task related errors.

exception pynenc.exceptions.TaskRoutingError(task_id: str, message: str | None = None)#

Bases: TaskError

Error raised when a task will not be routed.

Task Module#

class pynenc.task.Task(app: Pynenc, func: Func, options: dict[str, Any])#

Bases: Generic[Params, Result]

A task in the Pynenc library that represents a function that can be distributed.

Parameters#

appPynenc

A reference to the Pynenc application.

funcCallable

The function to be run distributed.

optionsdict

The options to apply.

The BaseTask can be called normally and will return an instance of BaseResult. The result will be an AsyncResult when running normally but can be SyncResult when running eagerly in development with the pynenc app’s dev_mode_force_sync_tasks option set to True (or the ‘PYNENC_DEV_MODE_FORCE_SYNC_TASK’ environment variable set). The option dev_mode_force_sync_tasks should only be used in development.

Although it is possible to create a BaseTask instance directly, it is recommended to use the decorator provided in the pynenc application, i.e., @app.task(options…). This is the expected way of instantiating a class and registering it in the app.

Limitations#

This implementation does not support the creation of tasks from functions defined in modules intended to run as standalone scripts. This applies to any module executed directly, where its __name__ attribute becomes “__main__”. This is not exclusive to modules with if __name__ == “__main__” sections but includes any module run as the main program. In such situations, func.__module__ being “__main__” poses a challenge for task instantiation and serialization. When a task is executed in the initiator script, it is identified as __main__.task_name. However, in a Pynenc worker’s distributed environment, __main__ refers to the worker itself. As a result, the task identified as __main__.task_name cannot be found, since the worker’s __main__ differs from that of the initiator script. To ensure simplicity and robustness in task management, tasks defined in modules run as the main program are not supported.

Examples#

>>> @app.task(options)
... def func():
...     pass
...
>>> result = func()

Raises#

RuntimeError

If an attempt is made to create a task from a function in the __main__ module.

args(*args: Params.args, **kwargs: Params.kwargs) Arguments#

Returns an Arguments instance from the given args and kwargs

property conf: ConfigTask#
classmethod from_json(app: Pynenc, serialized: str) Task#

Returns a new task from a serialized task

property invocation: BaseInvocation#

The invocation of the task

parallelize(param_iter: Iterable[tuple | dict | Arguments]) BaseInvocationGroup#

iterable of calls to the task, will accept a tuple positional arguments, a dict of keyword arguments, or an Arguments instance, eg: task.args(*args, **kwargs))

property retriable_exceptions: tuple[type[Exception], ...]#
property task_id: str#

The id of the task, which is the module and function name.

to_json() str#

Returns a string with the serialized task

validate_options() None#

validate that all the option fields exists in the config_fields it will raise an exception with all the invalid options

Types Module#

Broker Submodules#

Base Broker#

class pynenc.broker.base_broker.BaseBroker(app: Pynenc)#

Bases: ABC

property conf: ConfigBroker#
abstract purge() None#
abstract retrieve_invocation() DistributedInvocation | None#
route_call(call: Call[Params, Result]) DistributedInvocation[Params, Result]#

Creates a new invocation and routes it

abstract route_invocation(invocation: DistributedInvocation) None#

Memory Broker#

class pynenc.broker.mem_broker.MemBroker(app: Pynenc)#

Bases: BaseBroker

purge() None#
retrieve_invocation() DistributedInvocation | None#
route_invocation(invocation: DistributedInvocation) None#

Redis Broker#

class pynenc.broker.redis_broker.RedisBroker(app: Pynenc)#

Bases: BaseBroker

property conf: ConfigBrokerRedis#
purge() None#
retrieve_invocation() DistributedInvocation | None#
route_invocation(invocation: DistributedInvocation) None#
class pynenc.broker.redis_broker.RedisQueue(app: Pynenc, client: Redis, name: str, namespace: str = 'queue')#

Bases: object

purge() None#
receive_message() str | None#
send_message(message: str) None#

… (Previous content)

Configuration Submodules#

Base Config Option#

Config Base#

class pynenc.conf.config_base.ConfigBase(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#

Bases: object

Base class for defining configuration settings.

This class serves as the base for creating configuration classes. It supports hierarchical and flexible configuration from various sources, including environment variables, configuration files, and default values.

Configuration values are determined based on the following priority (highest to lowest): 1. Direct assignment in the config instance (not recommended) 2. Environment variables 3. Configuration file path specified by environment variables 4. Configuration file path (YAML, TOML, JSON) by config_filepath parameter 5. pyproject.toml 6. Default values specified in the ConfigField 7. Previous steps for any Parent config class 8. User does not specify anything (default values)

Examples#

Define a configuration class for a Redis client:

class ConfigRedis(ConfigBase):
    redis_host = ConfigField("localhost")
    redis_port = ConfigField(6379)
    redis_db = ConfigField(0)

Define a main configuration class for orchestrator components:

class ConfigOrchestrator(ConfigBase):
    cycle_control = ConfigField(True)
    blocking_control = ConfigField(True)
    auto_final_invocation_purge_hours = ConfigField(24.0)

Combine configurations using multiple inheritance:

class ConfigOrchestratorRedis(ConfigOrchestrator, ConfigRedis):
    pass

The ConfigOrchestratorRedis class now includes settings from both ConfigOrchestrator and ConfigRedis.

property all_fields: list[str]#
classmethod config_fields() list[str]#
static get_config_id(config_cls: Type[ConfigBase]) str#
init_config_value_from_env_vars(config_cls: Type[ConfigBase]) None#
init_config_value_from_mapping(source: str, config_id: str, mapping: dict[str, Any]) None#
init_config_value_key_from_mapping(source: str, config_id: str, key: str, mapping: dict, conf_mapping: dict) None#
init_config_values(config_cls: Type[ConfigBase], config_values: dict[str, Any] | None, config_filepath: str | None) None#
init_parent_values(config_cls: Type[ConfigBase], config_values: dict[str, Any] | None, config_filepath: str | None) None#
class pynenc.conf.config_base.ConfigField(default_value: T, mapper: Callable[[Any, Type[T]], T] | None = None)#

Bases: Generic[T]

Define each typed field from a ConfigBase instance.

This class is used to define typed configuration fields within a ConfigBase subclass. It ensures type consistency and supports value validation and casting.

Parameters#

default_valueT

The default value for the configuration field.

mapperOptional[ConfigFieldMapper]

An optional function to map or transform the value.

Attributes#

_default_valueT

Stores the default value of the configuration field.

_mapperConfigFieldMapper

The function used for mapping or transforming the value.

pynenc.conf.config_base.avoid_multi_inheritance_field_conflict(config_cls: Type, config_cls_to_fields: dict[str, set[str]]) dict[str, str]#

Ensures that the same configuration field is not defined in multiple parent classes of a given configuration class.

This function checks all parent classes of the provided configuration class that are subclasses of ConfigBase. It ensures that each configuration field is defined only once among all parent classes. If a field is found in multiple parent classes, a ConfigMultiInheritanceError is raised. This check ensures deterministic behavior in the configuration inheritance hierarchy.

Parameters:

config_cls (Type): The configuration class to check for field conflicts.

Returns:

dict[str, str]: A dictionary mapping each configuration field to the name of the parent class where it is defined.

Raises:

ConfigMultiInheritanceError: If a configuration field is found in multiple parent classes.

Example:
>>> class ParentConfig1(ConfigBase):
...     field1 = ConfigField(default_value=1)
...
>>> class ParentConfig2(ConfigBase):
...     field2 = ConfigField(default_value=2)
...
>>> class ChildConfig(ParentConfig1, ParentConfig2):
...     pass
...
>>> avoid_multi_inheritance_field_conflict(ChildConfig)
{'field1': 'ParentConfig1', 'field2': 'ParentConfig2'}
pynenc.conf.config_base.default_config_field_mapper(value: Any, expected_type: Type[T]) T#
pynenc.conf.config_base.get_config_fields(cls: Type) Iterator[str]#
pynenc.conf.config_base.get_env_key(field: str, config: Type[ConfigBase] | None = None) str#

gets the key used in the environment variables

Config Broker#

class pynenc.conf.config_broker.ConfigBroker(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#

Bases: ConfigBase

Main config of the boker components

class pynenc.conf.config_broker.ConfigBrokerRedis(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#

Bases: ConfigBroker, ConfigRedis

Specific Configuration for the Redis Broker

Config Orchestrator#

class pynenc.conf.config_orchestrator.ConfigOrchestrator(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#

Bases: ConfigBase

Main config of the orchestrator components.

Attributes#

cycle_controlConfigField[bool]

This boolean flag enables the orchestrator to detect cycles of calls to subtasks. For example, if task1 calls task2 and task2 calls back to task1, this can create an endless loop. The cycle control functionality is enabled by default to prevent such scenarios. Users can choose to disable it if needed.

blocking_controlConfigField[bool]

This boolean flag activates control over tasks that are blocking on other tasks. If a task invocation is waiting on another invocation, it notifies the runner, which temporarily removes it from the processing queue and uses the slot for another task invocation. Once the required invocation finishes, the dependent invocation is placed back into the run queue. This feature also prioritizes invocations that have many dependencies over new ones, ensuring efficient task management.

auto_final_invocation_purge_hoursConfigField[float]

This float value, defaulting to 24.0 hours, sets the duration after which the orchestrator purges all invocations older than the specified time. This purge mechanism helps keep the orchestrator lightweight and fast, as it should ideally operate with minimal latency. Detailed information about the invocations is stored in the result backend, which can handle more data and afford to be slower.

auto_final_invocation_purge_hours = 24.0#
blocking_control = True#
cycle_control = True#
class pynenc.conf.config_orchestrator.ConfigOrchestratorRedis(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#

Bases: ConfigOrchestrator, ConfigRedis

Specific Configuration for the Redis Orchestrator

… (Continue for other modules in conf)

Invocation Submodules#

Base Invocation#

class pynenc.invocation.base_invocation.BaseInvocation(call: Call[Params, Result])#

Bases: ABC, Generic[Params, Result]

Invocation of a task call

A call can have several invocations in the system

property app: Pynenc#
property arguments: Arguments#
call: Call[Params, Result]#
property call_id: str#
abstract classmethod from_json(app: Pynenc, serialized: str) T#

Returns a new invocation from a serialized invocation

property invocation_id: str#

Returns a unique id for this invocation

A task with the same arguments can have multiple invocations, the invocation id is used to differentiate them

abstract property num_retries: int#
abstract property result: Result#
property serialized_arguments: dict[str, str]#
abstract property status: InvocationStatus#
property task: Task[Params, Result]#
abstract to_json() str#

Returns a string with the serialized invocation

class pynenc.invocation.base_invocation.BaseInvocationGroup(task: 'Task', invocations: 'list[T]')#

Bases: ABC, Generic[Params, Result, T]

property app: Pynenc#
invocations: list[T]#
abstract property results: Iterator[Result]#
task: Task#

Dist Invocation#

class pynenc.invocation.dist_invocation.DistributedInvocation(call: 'Call[Params, Result]', parent_invocation: 'DistributedInvocation | None', _invocation_id: 'str | None' = None)#

Bases: BaseInvocation[Params, Result]

classmethod from_json(app: Pynenc, serialized: str) DistributedInvocation#

Returns a new invocation from a serialized invocation

get_final_result() Result#
property invocation_id: str#

on deserialization allows to set the invocation_id

property num_retries: int#

Get the number of times the invocation got retried

parent_invocation: DistributedInvocation | None#
property result: Result#
run(runner_args: dict[str, Any] | None = None) None#
property status: InvocationStatus#

Get the status of the invocation

to_json() str#

Returns a string with the serialized invocation

class pynenc.invocation.dist_invocation.DistributedInvocationGroup(task: Task, invocations: list[T])#

Bases: BaseInvocationGroup[Params, Result, DistributedInvocation]

property results: Iterator[Result]#
class pynenc.invocation.dist_invocation.ReusedInvocation(call: Call[Params, Result], parent_invocation: DistributedInvocation | None, _invocation_id: str | None = None, diff_arg: Arguments | None = None)#

Bases: DistributedInvocation

This is an invocation referencing an older one

diff_arg: Arguments | None = None#
classmethod from_existing(invocation: DistributedInvocation, diff_arg: Arguments | None = None) ReusedInvocation#

Status#

class pynenc.invocation.status.InvocationStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#

Bases: StrEnum

An enumeration representing the status of a task invocation.

The PENDING status will expire after the time specified in max_pending_seconds.

Notes#

FAILED, RETRY, and SUCCESS are the same category and can be considered as subtypes of a hypothetical TERMINATED status.

FAILED = 'failed'#

The task call finished with exceptions.

PAUSED = 'paused'#

The task call execution is paused.

PENDING = 'pending'#

The task call was picked by a runner but is not yet executed. The status pending will expire after Config.max_pending_seconds

REGISTERED = 'registered'#

The task call has been routed and is registered

REROUTED = 'rerouted'#
RETRY = 'retry'#

The task call finished with a retriable exception.

RUNNING = 'running'#

The task call is currently running.

SCHEDULED = 'scheduled'#

A task has been registered to run at a specific time. This is a subtype of REGISTERED.

SUCCESS = 'success'#

The task call finished without errors.

is_available_for_run() bool#

Check if the task is in a state where it is available to be picked up and run by any broker. This means the task is not currently being executed, paused, or about to be executed by any other broker.

Returns#
bool

True if the status is runnable by any broker, False otherwise.

is_final() bool#

Checks if the status is a final status.

Returns#
bool

True if the status is final, False otherwise.

Sync Invocation#

class pynenc.invocation.sync_invocation.SynchronousInvocation(call: Call[Params, Result])#

Bases: BaseInvocation[Params, Result]

classmethod from_json(app: Pynenc, serialized: str) SynchronousInvocation#

Returns a new invocation from a serialized invocation

property num_retries: int#

Get the number of times the invocation got retried

property result: Result#
property status: InvocationStatus#

Get the status of the invocation

to_json() str#

Returns a string with the serialized invocation

class pynenc.invocation.sync_invocation.SynchronousInvocationGroup(task: Task, invocations: list[T])#

Bases: BaseInvocationGroup[Params, Result, SynchronousInvocation]

property results: Iterator[Result]#

… (Previous content)

Orchestrator Submodules#

Base Orchestrator#

class pynenc.orchestrator.base_orchestrator.BaseBlockingControl#

Bases: ABC

Sub component of the orchestrator to implement blocking control functionalities

abstract get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation]#

Returns an iterator of invocations that are blocking other invocations but are not getting blocked by any invocation. order by age, the oldest invocation first.

abstract release_waiters(waited: DistributedInvocation) None#

Called when an invocation is finished and therefore cannot block other invocations anymore

abstract waiting_for_results(caller_invocation: DistributedInvocation[Params, Result], result_invocations: list[DistributedInvocation[Params, Result]]) None#

Called when an Optional[invocation] is waiting in the result result of another invocation.

class pynenc.orchestrator.base_orchestrator.BaseCycleControl#

Bases: ABC

Sub component of the orchestrator to implement cycle control functionalities

abstract add_call_and_check_cycles(caller_invocation: DistributedInvocation[Params, Result], callee_invocation: DistributedInvocation[Params, Result]) None#

Adds a new call between invocations and raise an exception to prevent the formation of a call cycle

abstract clean_up_invocation_cycles(invocation: DistributedInvocation) None#

Called when an invocation is finished and therefore cannot be part of a cycle anymore

class pynenc.orchestrator.base_orchestrator.BaseOrchestrator(app: Pynenc)#

Bases: ABC

add_call_and_check_cycles(caller_invocation: DistributedInvocation[Params, Result], callee_invocation: DistributedInvocation[Params, Result]) None#

Adds a new call between invocations and raise an exception to prevent the formation of a call cycle

abstract auto_purge() None#

Purge all invocations in final state that are older than app.conf.orchestrator_auto_final_invocation_purge_hours

abstract property blocking_control: BaseBlockingControl#
clean_up_invocation_cycles(invocation: DistributedInvocation) None#

Called when an invocation is finished and therefore cannot be part of a cycle anymore

property conf: ConfigOrchestrator#
abstract property cycle_control: BaseCycleControl#
get_additional_invocations_to_run(missing_invocations: int, blocking_invocation_ids: set[str], invocations_to_reroute: set[DistributedInvocation]) Iterator[DistributedInvocation]#
get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation]#

Returns an iterator of invocations that are blocking other invocations but are not getting blocked by any invocation. order by age, the oldest invocation first.

get_blocking_invocations_to_run(max_num_invocations: int, blocking_invocation_ids: set[str]) Iterator[DistributedInvocation]#
abstract get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, status: InvocationStatus | None = None) Iterator[DistributedInvocation]#
abstract get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int#
abstract get_invocation_status(invocation: DistributedInvocation[Params, Result]) InvocationStatus#
get_invocations_to_run(max_num_invocations: int) Iterator[DistributedInvocation]#
abstract increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None#
is_authorize_to_run_by_concurrency_control(invocation: DistributedInvocation) bool#

Checks if the invocation can run based on task concurrency configuration

abstract purge() None#
release_waiters(waited: DistributedInvocation) None#

Called when an invocation is finished and therefore cannot block other invocations anymore

reroute_invocations(invocations_to_reroute: set[DistributedInvocation]) None#
route_call(call: Call) DistributedInvocation[Params, Result]#

Routes a task call in a distributed task system based on single invocation options.

If the task has no single invocation option set, it routes a new call invocation. For tasks with single invocation, it checks if an existing invocation with the same or different arguments exists. If an invocation with the same arguments exists, it reuses the invocation. If an invocation with different arguments exists, it raises an error or reuses the invocation based on the ‘on_diff_args_raise’ flag.

Parameters:

call (Call) – The task call to be routed.

Returns:

The resulting DistributedInvocation object, which could be a new or reused invocation.

Return type:

DistributedInvocation[Params, Result]

set_invocation_exception(invocation: DistributedInvocation, exception: Exception) None#

Called when an invocation is finished with an exception

set_invocation_result(invocation: DistributedInvocation, result: Any) None#

Called when an invocation is finished successfully

set_invocation_retry(invocation: DistributedInvocation, exception: Exception) None#

Called when an invocation is finished with an exception and should be retried

set_invocation_run(caller: DistributedInvocation[Params, Result] | None, callee: DistributedInvocation[Params, Result]) None#

Called when an invocation is started

set_invocation_status(invocation: DistributedInvocation[Params, Result], status: InvocationStatus) None#
set_invocations_status(invocations: list[DistributedInvocation[Params, Result]], status: InvocationStatus) None#
abstract set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None#

set up the invocation to be auto purgue after app.conf.orchestrator_auto_final_invocation_purge_hours

waiting_for_results(caller_invocation: DistributedInvocation[Params, Result] | None, result_invocations: list[DistributedInvocation[Params, Result]]) None#

Called when an Optional[invocation] is waiting in the result result of another invocation.

Memory Orchestrator#

class pynenc.orchestrator.mem_orchestrator.ArgPair(key: str, value: Any)#

Bases: object

Helper to simulate a Memory cache for key:value pairs in Task Invocations

class pynenc.orchestrator.mem_orchestrator.MemBlockingControl(app: Pynenc)#

Bases: BaseBlockingControl

A directed acyclic graph representing the call dependencies

get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation[Params, Result]]#

Returns the invocations that are blocking others but not waiting for anything themselves. The oldest invocations are returned first.

Parameters#

max_num_invocationsint

The maximum number of invocations to return.

Returns#

Set[DistributedInvocation] | None

A set of blocking invocations or None if no invocations are blocking.

release_waiters(invocation: DistributedInvocation) None#

Remove an invocation from the graph. Also removes any edges to or from the invocation.

waiting_for_results(caller_invocation: DistributedInvocation[Params, Result], result_invocations: list[DistributedInvocation[Params, Result]]) None#

Register that an invocation (waiter) is waiting for the results of another invocation (waited).

class pynenc.orchestrator.mem_orchestrator.MemCycleControl(app: Pynenc)#

Bases: BaseCycleControl

A directed acyclic graph representing the call dependencies

add_call_and_check_cycles(caller: DistributedInvocation, callee: DistributedInvocation) None#

Add a new invocation to the graph. This represents a dependency where the caller is dependent on the callee.

Raises a CycleDetectedError if the invocation would cause a cycle.

clean_up_invocation_cycles(invocation: DistributedInvocation) None#

Remove an invocation from the graph. Also removes any edges to or from the invocation.

find_cycle_caused_by_new_invocation(caller: DistributedInvocation, callee: DistributedInvocation) list[Call]#

Determines if adding an edge from the caller to the callee would create a cycle.

Parameters#

callerDistributedInvocation

The invocation making the call.

calleeDistributedInvocation

The invocation being called.

Returns#

list

List of invocations that would form the cycle after adding the new invocation, else an empty list.

class pynenc.orchestrator.mem_orchestrator.MemOrchestrator(app: Pynenc)#

Bases: BaseOrchestrator

auto_purge() None#

Purge all invocations in final state that are older than app.conf.orchestrator_auto_final_invocation_purge_hours

property blocking_control: MemBlockingControl#
property cycle_control: MemCycleControl#
get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, status: InvocationStatus | None = None) Iterator[DistributedInvocation]#
get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int#
get_invocation_status(invocation: DistributedInvocation[Params, Result]) InvocationStatus#
increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None#
purge() None#
set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None#

set up the invocation to be auto purgue after app.conf.orchestrator_auto_final_invocation_purge_hours

class pynenc.orchestrator.mem_orchestrator.TaskInvocationCache(app: Pynenc)#

Bases: Generic[Result]

auto_purge() None#
clean_pending_status(invocation: DistributedInvocation[Params, Result]) None#
clean_up_invocation(invocation_id: str) None#
get_invocations(key_arguments: dict[str, str] | None, status: InvocationStatus | None) Iterator[DistributedInvocation]#
get_retries(invocation: DistributedInvocation[Params, Result]) int#
get_status(invocation: DistributedInvocation[Params, Result]) InvocationStatus#
increase_retries(invocation: DistributedInvocation[Params, Result]) None#
set_pending_status(invocation: DistributedInvocation[Params, Result]) None#
set_status(invocation: DistributedInvocation[Params, Result], status: InvocationStatus) None#
set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None#

Redis Orchestrator#

class pynenc.orchestrator.redis_orchestrator.RedisBlockingControl(app: Pynenc, client: Redis)#

Bases: BaseBlockingControl

A directed acyclic graph representing the call dependencies

get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation[Params, Result]]#

Returns the invocations that are blocking others but not waiting for anything themselves. The oldest invocations are returned first.

Parameters#

max_num_invocationsint

The maximum number of invocations to return.

Returns#

list[DistributedInvocation]

A list of blocking invocations or an empty list if no invocations are blocking.

purge() None#
release_waiters(invocation: DistributedInvocation) None#

Remove an invocation from the graph. Also removes any edges to or from the invocation.

waiting_for_results(waiter: DistributedInvocation, waiteds: list[DistributedInvocation]) None#

Register that an invocation (waiter) is waiting for the results of another invocation (waited).

class pynenc.orchestrator.redis_orchestrator.RedisCycleControl(app: Pynenc, client: Redis)#

Bases: BaseCycleControl

A directed acyclic graph representing the call dependencies

add_call_and_check_cycles(caller: DistributedInvocation, callee: DistributedInvocation) None#

Add a new invocation to the graph. This represents a dependency where the caller is dependent on the callee.

Raises a CycleDetectedError if the invocation would cause a cycle.

clean_up_invocation_cycles(invocation: DistributedInvocation) None#

Remove an invocation from the graph. Also removes any edges to or from the invocation.

find_cycle_caused_by_new_invocation(caller: DistributedInvocation, callee: DistributedInvocation) list[Call]#

Determines if adding an edge from the caller to the callee would create a cycle.

Parameters#

callerDistributedInvocation

The invocation making the call.

calleeDistributedInvocation

The invocation being called.

Returns#

list

List of invocations that would form the cycle after adding the new invocation, else an empty list.

purge() None#
remove_edges(call_id: str) None#

Remove all edges from a call

class pynenc.orchestrator.redis_orchestrator.RedisOrchestrator(app: Pynenc)#

Bases: BaseOrchestrator

auto_purge() None#

Purge all invocations in final state that are older than app.conf.orchestrator_auto_final_invocation_purge_hours

property blocking_control: RedisBlockingControl#
property conf: ConfigOrchestratorRedis#
property cycle_control: RedisCycleControl#
get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, status: InvocationStatus | None = None) Iterator[DistributedInvocation]#
get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int#
get_invocation_status(invocation: DistributedInvocation[Params, Result]) InvocationStatus#
increase_retries(invocation: DistributedInvocation[Params, Result]) None#
increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None#
purge() None#

Remove all invocations from the orchestrator

set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None#

set up the invocation to be auto purgue after app.conf.orchestrator_auto_final_invocation_purge_hours

exception pynenc.orchestrator.redis_orchestrator.StatusNotFound#

Bases: Exception

Raised when a status is not found in Redis

class pynenc.orchestrator.redis_orchestrator.TaskRedisCache(app: Pynenc, client: Redis)#

Bases: object

auto_purge() None#
clean_pending_status(invocation: DistributedInvocation[Params, Result]) None#
get_invocation_retries(invocation: DistributedInvocation) int#
get_invocation_status(invocation: DistributedInvocation) InvocationStatus#
get_invocations(task_id: str, key_arguments: dict[str, str] | None, status: InvocationStatus | None) Iterator[DistributedInvocation]#
increment_invocation_retries(invocation: DistributedInvocation) None#
purge() None#
set_pending_status(invocation: DistributedInvocation[Params, Result]) None#
set_status(invocation: DistributedInvocation[Params, Result], status: InvocationStatus) None#
set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None#

Runner Submodules#

Base Runner#

class pynenc.runner.base_runner.BaseRunner(app: Pynenc)#

Bases: ABC

The Runner will execute invocations from the broker.

  • It requires an app because it needs to know about the broker, orchestrator, etc.

  • The runner will affect the behavior of the task result, for example: * In a subprocess environment, it may implement a pipe to communicate for pausing/resuming processes. * In an async environment, the value should be an async function to wait for distributed results. * In a cloud function environment aiming for speed with a single thread, it might not wait more than ‘x’ seconds and instead, create a ‘callback’, save the status, and convert the current execution into a task that will be called when the result is ready. * In a multiprocessing environment in a Kubernetes pod with capabilities to create new pods, it may have different behaviors. * For an asyncio worker, it runs several tasks in one processor, and the value should wait with async.

property conf: ConfigRunner#
abstract property max_parallel_slots: int#

The maximum number of parallel task that the runner can handle

abstract static mem_compatible() bool#

Can this runner run with memory components?

on_start() None#

This method is called when the runner starts

on_stop() None#

This method is called when the runner stops

run() None#

Starts the runner

property runner_id: str#
abstract runner_loop_iteration() None#

One iteration of the runner loop. Subclasses should implement this method to process invocations.

stop_runner_loop(signum: int | None = None, frame: FrameType | None = None) None#

Stops the runner loop

abstract waiting_for_results(running_invocation: DistributedInvocation | None, result_invocation: list[DistributedInvocation], runner_args: dict[str, Any] | None = None) None#

This method is called from the result method of an invocation It signals the runner that the running invocation is waiting for the result of the result invocation

The running invocation may be None, when the result was called from outside a runner (e.g. user environment) In that case will be handle by the DummyRunner (default in the pynenc app to handle this cases)

The runner has the oportunity to define the waiting behaviour of the running invocation in this method Otherwise the running invocation will infinetely loop until the result invocation is ready

runner_args is a dictionary with the arguments passed to the runner by itself e.g. process runner uses this to syncronize managed dictionaries among sub-process

class pynenc.runner.base_runner.DummyRunner(app: Pynenc)#

Bases: BaseRunner

This runner is a placeholder for the Pynenc app. It will be used when the app is defined in any other Python environment than a Pynenc runner.

Examples include:
  • A script that defines the app, decorates some tasks, routes them, and then finishes. Such a script does not plan to run anything itself but triggers tasks that will later run in actual runners.

property max_parallel_slots: int#

The maximum number of parallel task that the runner can handle

static mem_compatible() bool#

Can this runner run with memory components?

runner_loop_iteration() None#

One iteration of the runner loop. Subclasses should implement this method to process invocations.

waiting_for_results(running_invocation: DistributedInvocation | None, result_invocation: list[DistributedInvocation], runner_args: dict[str, Any] | None = None) None#

This method is called from the result method of an invocation It signals the runner that the running invocation is waiting for the result of the result invocation

The running invocation may be None, when the result was called from outside a runner (e.g. user environment) In that case will be handle by the DummyRunner (default in the pynenc app to handle this cases)

The runner has the oportunity to define the waiting behaviour of the running invocation in this method Otherwise the running invocation will infinetely loop until the result invocation is ready

runner_args is a dictionary with the arguments passed to the runner by itself e.g. process runner uses this to syncronize managed dictionaries among sub-process

Context (Runner)#

class pynenc.runner.context.ApplicationContext#

Bases: object

todo

Memory Runner#

Process Runner#

class pynenc.runner.process_runner.ProcessRunner(app: Pynenc)#

Bases: BaseRunner

property available_processes: int#
manager: Manager#
property max_parallel_slots: int#

The maximum number of parallel task that the runner can handle

max_processes: int#
static mem_compatible() bool#

Can this runner run with memory components?

parse_args(args: dict[str, Any]) None#
processes: dict[DistributedInvocation, Process]#
property runner_args: dict[str, Any]#
runner_loop_iteration() None#

One iteration of the runner loop. Subclasses should implement this method to process invocations.

wait_invocation: dict[DistributedInvocation, set[DistributedInvocation]]#
waiting_for_results(running_invocation: DistributedInvocation | None, result_invocations: list[DistributedInvocation], runner_args: dict[str, Any] | None = None) None#

This method is called from the result method of an invocation It signals the runner that the running invocation is waiting for the result of the result invocation

The running invocation may be None, when the result was called from outside a runner (e.g. user environment) In that case will be handle by the DummyRunner (default in the pynenc app to handle this cases)

The runner has the oportunity to define the waiting behaviour of the running invocation in this method Otherwise the running invocation will infinetely loop until the result invocation is ready

runner_args is a dictionary with the arguments passed to the runner by itself e.g. process runner uses this to syncronize managed dictionaries among sub-process

property waiting_processes: int#

Serializer Submodules#

Base Serializer#

class pynenc.serializer.base_serializer.BaseSerializer#

Bases: ABC

abstract static deserialize(obj: str) Any#
abstract static serialize(obj: Any) str#

JSON Serializer#

class pynenc.serializer.json_serializer.DefaultJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)#

Bases: JSONEncoder

default(obj: Any) Any#

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
class pynenc.serializer.json_serializer.JsonSerializer#

Bases: BaseSerializer

static deserialize(obj: str) Any#
static serialize(obj: Any) str#
class pynenc.serializer.json_serializer.ReservedKeys(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#

Bases: StrEnum

Keys that are reserved for internal use

ERROR = '__pynenc__std_py_exc__'#

Pickle Serializer#

class pynenc.serializer.pickle_serializer.PickleSerializer#

Bases: BaseSerializer

static deserialize(serialized_obj: str) Any#
static serialize(obj: Any) str#

State Backend Submodules#

Base State Backend#

class pynenc.state_backend.base_state_backend.BaseStateBackend(app: Pynenc)#

Bases: ABC

add_history(invocation: DistributedInvocation[Params, Result], status: InvocationStatus | None = None, execution_context: Any | None = None) None#
property conf: ConfigStateBackend#
get_exception(invocation: DistributedInvocation[Params, Result]) Exception#
get_history(invocation: DistributedInvocation[Params, Result]) list[InvocationHistory]#
get_invocation(invocation_id: str) DistributedInvocation#
get_result(invocation: DistributedInvocation[Params, Result]) Result#
abstract purge() None#
set_exception(invocation: DistributedInvocation, exception: Exception) None#
set_result(invocation: DistributedInvocation, result: Result) None#
upsert_invocation(invocation: DistributedInvocation) None#
wait_for_all_async_operations() None#

Blocks until all asynchronous status operations are finished.

wait_for_invocation_async_operations(invocation_id: str) None#

Blocks until all asynchronous operations for a specific invocation are finished.

class pynenc.state_backend.base_state_backend.InvocationHistory(invocation_id: str, status: Optional[ForwardRef('InvocationStatus')] = None, execution_context: Any | None = None)#

Bases: object

execution_context: Any | None = None#
classmethod from_json(json_str: str) InvocationHistory#
invocation_id: str#
status: InvocationStatus | None = None#
property timestamp: datetime#
to_json() str#

Memory State Backend#

class pynenc.state_backend.mem_state_backend.MemStateBackend(app: Pynenc)#

Bases: BaseStateBackend

purge() None#

Redis State Backend#

class pynenc.state_backend.redis_state_backend.RedisStateBackend(app: Pynenc)#

Bases: BaseStateBackend

property conf: ConfigStateBackendRedis#
purge() None#

Util Submodules#

Files#

pynenc.util.files.load_config_from_toml(file_path: str) dict#
pynenc.util.files.load_file(filepath: str) dict[str, Any]#

Log#

class pynenc.util.log.RunnerLogAdapter(logger: Logger, runner_id: str)#

Bases: LoggerAdapter

process(msg: Any, kwargs: Any) Any#

Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.

Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.

class pynenc.util.log.TaskLoggerAdapter(logger: Logger, task_id: str, invocation_id: str | None = None)#

Bases: LoggerAdapter

process(msg: Any, kwargs: Any) Any#

Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.

Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.

set_context(task_id: str, invocation_id: str | None) None#
pynenc.util.log.create_logger(app: Pynenc) Logger#

Creates a logger for the given app

Redis Keys#

class pynenc.util.redis_keys.Key(app_id: str, prefix: str)#

Bases: object

all_waited() str#
args(task_id: str, arg: str, val: str) str#
call(call_id: str) str#
call_to_invocation(call_id: str) str#
default_queue() str#
edge(call_id: str) str#
exception(invocation_id: str) str#
history(invocation_id: str) str#
invocation(invocation_id: str) str#
invocation_auto_purge() str#
invocation_retries(invocation_id: str) str#
invocation_status(invocation_id: str) str#
not_waiting() str#
pending_timer(invocation_id: str) str#
previous_status(invocation_id: str) str#
purge(client: Redis) None#

Purge all keys with the given prefix

result(invocation_id: str) str#
status(task_id: str, status: InvocationStatus) str#
task(task_id: str) str#
waited_by(invocation_id: str) str#
waiting_for(invocation_id: str) str#
pynenc.util.redis_keys.sanitize_for_redis(s: str) str#

Subclasses#

pynenc.util.subclasses.get_all_subclasses(cls: type[T]) list[type[T]]#
pynenc.util.subclasses.get_subclass(root_class: type[T], child_class_name: str) type[T]#

Returns the subclass with the given name (any level deep)