API Documentation#
This section contains autogenerated documentation from the codebase of Pynenc, detailing the API and its usage.
Main Modules#
App Module#
- class pynenc.app.Pynenc(app_id: str | None = None, config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#
Bases:
objectThe main class of the Pynenc library that creates an application object.
Parameters#
- task_brokerBaseTaskBroker
Handles routing of tasks for distributed execution.
- state_backendBaseStateBackend
Maintains the state of tasks, runners, and other relevant system states.
- orchestratorBaseOrchestrator
Coordinates all components and acts according to the configuration.
- reportinglist of BaseReporting
Reports to one or more systems.
Notes#
All of these base classes are abstract and cannot be used directly. If none is specified, they will default to MemTaskBroker, MemStateBackend, etc. These default classes do not actually distribute the code but are helpers for tests or for running an application on your localhost. They may help to parallelize to some degree but cannot be used in a production system.
Examples#
Default Pynenc application for running in memory in a local environment.
>>> app = Pynenc()
- property app_id: str#
- property broker: BaseBroker#
- property conf: ConfigPynenc#
- is_initialized(property_name: str) bool#
Returns True if the given cached_property has been initialized
- property logger: Logger#
- property orchestrator: BaseOrchestrator#
- purge() None#
Purge all data from the broker and state backend
- property runner: BaseRunner#
- property serializer: BaseSerializer#
- property state_backend: BaseStateBackend#
- task(func: Func, **options: Any) Task#
- task(func: None = None, **options: Any) Callable[['Func'], 'Task']
The task decorator converts the function into an instance of a BaseTask. It accepts any kind of options, however these options will be validated with the options class assigned to the class.
Check the options reference in conf.config_task or the Pynenc documentation for a detailed explanation of the BaseTask instance you are applying.
Parameters#
- funcCallable, optional
The function to be converted into a BaseTask instance.
**optionsdictThe options to be passed to the BaseTask instance.
Returns#
- Task | Callable[…, Task]
The BaseTask instance or a callable that returns a BaseTask instance.
Examples#
>>> @app.task(option1='value1', option2='value2') ... def my_func(x, y): ... return x + y ... >>> result = my_func(1, 2)
Arguments Module#
Call Module#
- class pynenc.call.Call(task: Task[Params, Result], arguments: Arguments = <factory>)#
Bases:
Generic[Params,Result]A specific call of a task
A call is unique per task and arguments
- property call_id: str#
Returns a unique id for each task and arguments
- deserialize_arguments(serialized_arguments: dict[str, str]) Arguments#
Returns an Arguments instance with the deserialized arguments
- property serialized_args_for_concurrency_check: dict[str, str] | None#
Returns a dictionary with the call arguments required for the task concurrency check
- property serialized_arguments: dict[str, str]#
Returns a dictionary with each of the call arguments serialized into a string
- to_json() str#
Returns a string with the serialized call
Context Module#
Exceptions Module#
Global Pynenc exception and warning classes.
- exception pynenc.exceptions.AlreadyInitializedError#
Bases:
PynencErrorError raised when trying to change the class of a component after it was initialized
- exception pynenc.exceptions.ConfigError#
Bases:
PynencErrorBase class for all the config related errors
- exception pynenc.exceptions.ConfigMultiInheritanceError#
Bases:
ConfigErrorError related with multiinheritance of config fields
- exception pynenc.exceptions.CycleDetectedError(call_ids: list[str], message: str)#
Bases:
PynencErrorRaised when a cycle is detected in the DependencyGraph
- classmethod from_cycle(cycle: list[Call]) CycleDetectedError#
- exception pynenc.exceptions.InvalidTaskOptionsError(task_id: str, message: str | None = None)#
Bases:
TaskErrorError raised when the task options are invalid.
- exception pynenc.exceptions.InvocationConcurrencyWithDifferentArgumentsError(task_id: str, existing_invocation_id: str, new_call_id: str, diff: str, message: str | None = None)#
Bases:
TaskRoutingErrorError raised when there is a pending task with different arguments than the current task.
- classmethod from_call_mismatch(existing_invocation: BaseInvocation, new_call: Call, message: str | None = None) InvocationConcurrencyWithDifferentArgumentsError#
- exception pynenc.exceptions.InvocationError(invocation_id: str, message: str | None = None)#
Bases:
PynencErrorBase class for all Task related errors.
- exception pynenc.exceptions.InvocationNotFoundError(invocation_id: str, message: str | None = None)#
Bases:
StateBackendErrorError raised when the invocation is not present in the State Backend.
- exception pynenc.exceptions.PendingInvocationLockError(invocation_id: str)#
Bases:
PynencErrorError raised when two processes try to set the same invocation as pending concurrently
- exception pynenc.exceptions.PynencError#
Bases:
ExceptionBase class for all Pynenc related errors.
- classmethod from_json(error_name: str, serialized: str) PynencError#
Returns the child class from a serialized error
- to_json() str#
Returns a string with the serialized error
- exception pynenc.exceptions.RetryError#
Bases:
PynencErrorError raised when a task should be retried.
- exception pynenc.exceptions.RunnerError#
Bases:
PynencErrorBase class for all Runner related errors.
- exception pynenc.exceptions.RunnerNotExecutableError#
Bases:
PynencErrorRaised when trying to execute a runner that is not meant to be executed.
- exception pynenc.exceptions.StateBackendError#
Bases:
PynencErrorError raised when a task will not be routed.
- exception pynenc.exceptions.TaskError(task_id: str, message: str | None = None)#
Bases:
PynencErrorBase class for all Task related errors.
Task Module#
- class pynenc.task.Task(app: Pynenc, func: Func, options: dict[str, Any])#
Bases:
Generic[Params,Result]A task in the Pynenc library that represents a function that can be distributed.
Parameters#
- appPynenc
A reference to the Pynenc application.
- funcCallable
The function to be run distributed.
- optionsdict
The options to apply.
The BaseTask can be called normally and will return an instance of BaseResult. The result will be an AsyncResult when running normally but can be SyncResult when running eagerly in development with the pynenc app’s dev_mode_force_sync_tasks option set to True (or the ‘PYNENC_DEV_MODE_FORCE_SYNC_TASK’ environment variable set). The option dev_mode_force_sync_tasks should only be used in development.
Although it is possible to create a BaseTask instance directly, it is recommended to use the decorator provided in the pynenc application, i.e., @app.task(options…). This is the expected way of instantiating a class and registering it in the app.
Limitations#
This implementation does not support the creation of tasks from functions defined in modules intended to run as standalone scripts. This applies to any module executed directly, where its __name__ attribute becomes “__main__”. This is not exclusive to modules with if __name__ == “__main__” sections but includes any module run as the main program. In such situations, func.__module__ being “__main__” poses a challenge for task instantiation and serialization. When a task is executed in the initiator script, it is identified as __main__.task_name. However, in a Pynenc worker’s distributed environment, __main__ refers to the worker itself. As a result, the task identified as __main__.task_name cannot be found, since the worker’s __main__ differs from that of the initiator script. To ensure simplicity and robustness in task management, tasks defined in modules run as the main program are not supported.
Examples#
>>> @app.task(options) ... def func(): ... pass ... >>> result = func()
Raises#
- RuntimeError
If an attempt is made to create a task from a function in the __main__ module.
- args(*args: Params.args, **kwargs: Params.kwargs) Arguments#
Returns an Arguments instance from the given args and kwargs
- property conf: ConfigTask#
- property invocation: BaseInvocation#
The invocation of the task
- parallelize(param_iter: Iterable[tuple | dict | Arguments]) BaseInvocationGroup#
iterable of calls to the task, will accept a tuple positional arguments, a dict of keyword arguments, or an Arguments instance, eg: task.args(
*args,**kwargs))
- property retriable_exceptions: tuple[type[Exception], ...]#
- property task_id: str#
The id of the task, which is the module and function name.
- to_json() str#
Returns a string with the serialized task
- validate_options() None#
validate that all the option fields exists in the config_fields it will raise an exception with all the invalid options
Types Module#
Broker Submodules#
Base Broker#
- class pynenc.broker.base_broker.BaseBroker(app: Pynenc)#
Bases:
ABC- property conf: ConfigBroker#
- abstract purge() None#
- abstract retrieve_invocation() DistributedInvocation | None#
- route_call(call: Call[Params, Result]) DistributedInvocation[Params, Result]#
Creates a new invocation and routes it
- abstract route_invocation(invocation: DistributedInvocation) None#
Memory Broker#
- class pynenc.broker.mem_broker.MemBroker(app: Pynenc)#
Bases:
BaseBroker- purge() None#
- retrieve_invocation() DistributedInvocation | None#
- route_invocation(invocation: DistributedInvocation) None#
Redis Broker#
- class pynenc.broker.redis_broker.RedisBroker(app: Pynenc)#
Bases:
BaseBroker- property conf: ConfigBrokerRedis#
- purge() None#
- retrieve_invocation() DistributedInvocation | None#
- route_invocation(invocation: DistributedInvocation) None#
- class pynenc.broker.redis_broker.RedisQueue(app: Pynenc, client: Redis, name: str, namespace: str = 'queue')#
Bases:
object- purge() None#
- receive_message() str | None#
- send_message(message: str) None#
… (Previous content)
Configuration Submodules#
Base Config Option#
Config Base#
- class pynenc.conf.config_base.ConfigBase(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#
Bases:
objectBase class for defining configuration settings.
This class serves as the base for creating configuration classes. It supports hierarchical and flexible configuration from various sources, including environment variables, configuration files, and default values.
Configuration values are determined based on the following priority (highest to lowest): 1. Direct assignment in the config instance (not recommended) 2. Environment variables 3. Configuration file path specified by environment variables 4. Configuration file path (YAML, TOML, JSON) by config_filepath parameter 5. pyproject.toml 6. Default values specified in the ConfigField 7. Previous steps for any Parent config class 8. User does not specify anything (default values)
Examples#
Define a configuration class for a Redis client:
class ConfigRedis(ConfigBase): redis_host = ConfigField("localhost") redis_port = ConfigField(6379) redis_db = ConfigField(0)
Define a main configuration class for orchestrator components:
class ConfigOrchestrator(ConfigBase): cycle_control = ConfigField(True) blocking_control = ConfigField(True) auto_final_invocation_purge_hours = ConfigField(24.0)
Combine configurations using multiple inheritance:
class ConfigOrchestratorRedis(ConfigOrchestrator, ConfigRedis): pass
The ConfigOrchestratorRedis class now includes settings from both ConfigOrchestrator and ConfigRedis.
- property all_fields: list[str]#
- classmethod config_fields() list[str]#
- static get_config_id(config_cls: Type[ConfigBase]) str#
- init_config_value_from_env_vars(config_cls: Type[ConfigBase]) None#
- init_config_value_from_mapping(source: str, config_id: str, mapping: dict[str, Any]) None#
- init_config_value_key_from_mapping(source: str, config_id: str, key: str, mapping: dict, conf_mapping: dict) None#
- init_config_values(config_cls: Type[ConfigBase], config_values: dict[str, Any] | None, config_filepath: str | None) None#
- init_parent_values(config_cls: Type[ConfigBase], config_values: dict[str, Any] | None, config_filepath: str | None) None#
- class pynenc.conf.config_base.ConfigField(default_value: T, mapper: Callable[[Any, Type[T]], T] | None = None)#
Bases:
Generic[T]Define each typed field from a ConfigBase instance.
This class is used to define typed configuration fields within a ConfigBase subclass. It ensures type consistency and supports value validation and casting.
Parameters#
- default_valueT
The default value for the configuration field.
- mapperOptional[ConfigFieldMapper]
An optional function to map or transform the value.
Attributes#
- _default_valueT
Stores the default value of the configuration field.
- _mapperConfigFieldMapper
The function used for mapping or transforming the value.
- pynenc.conf.config_base.avoid_multi_inheritance_field_conflict(config_cls: Type, config_cls_to_fields: dict[str, set[str]]) dict[str, str]#
Ensures that the same configuration field is not defined in multiple parent classes of a given configuration class.
This function checks all parent classes of the provided configuration class that are subclasses of ConfigBase. It ensures that each configuration field is defined only once among all parent classes. If a field is found in multiple parent classes, a ConfigMultiInheritanceError is raised. This check ensures deterministic behavior in the configuration inheritance hierarchy.
- Parameters:
config_cls (Type): The configuration class to check for field conflicts.
- Returns:
dict[str, str]: A dictionary mapping each configuration field to the name of the parent class where it is defined.
- Raises:
ConfigMultiInheritanceError: If a configuration field is found in multiple parent classes.
- Example:
>>> class ParentConfig1(ConfigBase): ... field1 = ConfigField(default_value=1) ... >>> class ParentConfig2(ConfigBase): ... field2 = ConfigField(default_value=2) ... >>> class ChildConfig(ParentConfig1, ParentConfig2): ... pass ... >>> avoid_multi_inheritance_field_conflict(ChildConfig) {'field1': 'ParentConfig1', 'field2': 'ParentConfig2'}
- pynenc.conf.config_base.default_config_field_mapper(value: Any, expected_type: Type[T]) T#
- pynenc.conf.config_base.get_config_fields(cls: Type) Iterator[str]#
- pynenc.conf.config_base.get_env_key(field: str, config: Type[ConfigBase] | None = None) str#
gets the key used in the environment variables
Config Broker#
- class pynenc.conf.config_broker.ConfigBroker(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#
Bases:
ConfigBaseMain config of the boker components
- class pynenc.conf.config_broker.ConfigBrokerRedis(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#
Bases:
ConfigBroker,ConfigRedisSpecific Configuration for the Redis Broker
Config Orchestrator#
- class pynenc.conf.config_orchestrator.ConfigOrchestrator(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#
Bases:
ConfigBaseMain config of the orchestrator components.
Attributes#
- cycle_controlConfigField[bool]
This boolean flag enables the orchestrator to detect cycles of calls to subtasks. For example, if task1 calls task2 and task2 calls back to task1, this can create an endless loop. The cycle control functionality is enabled by default to prevent such scenarios. Users can choose to disable it if needed.
- blocking_controlConfigField[bool]
This boolean flag activates control over tasks that are blocking on other tasks. If a task invocation is waiting on another invocation, it notifies the runner, which temporarily removes it from the processing queue and uses the slot for another task invocation. Once the required invocation finishes, the dependent invocation is placed back into the run queue. This feature also prioritizes invocations that have many dependencies over new ones, ensuring efficient task management.
- auto_final_invocation_purge_hoursConfigField[float]
This float value, defaulting to 24.0 hours, sets the duration after which the orchestrator purges all invocations older than the specified time. This purge mechanism helps keep the orchestrator lightweight and fast, as it should ideally operate with minimal latency. Detailed information about the invocations is stored in the result backend, which can handle more data and afford to be slower.
- auto_final_invocation_purge_hours = 24.0#
- blocking_control = True#
- cycle_control = True#
- class pynenc.conf.config_orchestrator.ConfigOrchestratorRedis(config_values: dict[str, Any] | None = None, config_filepath: str | None = None)#
Bases:
ConfigOrchestrator,ConfigRedisSpecific Configuration for the Redis Orchestrator
… (Continue for other modules in conf)
Invocation Submodules#
Base Invocation#
- class pynenc.invocation.base_invocation.BaseInvocation(call: Call[Params, Result])#
Bases:
ABC,Generic[Params,Result]Invocation of a task call
A call can have several invocations in the system
- property call_id: str#
- abstract classmethod from_json(app: Pynenc, serialized: str) T#
Returns a new invocation from a serialized invocation
- property invocation_id: str#
Returns a unique id for this invocation
A task with the same arguments can have multiple invocations, the invocation id is used to differentiate them
- abstract property num_retries: int#
- abstract property result: Result#
- property serialized_arguments: dict[str, str]#
- abstract property status: InvocationStatus#
- abstract to_json() str#
Returns a string with the serialized invocation
Dist Invocation#
- class pynenc.invocation.dist_invocation.DistributedInvocation(call: 'Call[Params, Result]', parent_invocation: 'DistributedInvocation | None', _invocation_id: 'str | None' = None)#
Bases:
BaseInvocation[Params,Result]- classmethod from_json(app: Pynenc, serialized: str) DistributedInvocation#
Returns a new invocation from a serialized invocation
- get_final_result() Result#
- property invocation_id: str#
on deserialization allows to set the invocation_id
- property num_retries: int#
Get the number of times the invocation got retried
- parent_invocation: DistributedInvocation | None#
- property result: Result#
- run(runner_args: dict[str, Any] | None = None) None#
- property status: InvocationStatus#
Get the status of the invocation
- to_json() str#
Returns a string with the serialized invocation
- class pynenc.invocation.dist_invocation.DistributedInvocationGroup(task: Task, invocations: list[T])#
Bases:
BaseInvocationGroup[Params,Result,DistributedInvocation]- property results: Iterator[Result]#
- class pynenc.invocation.dist_invocation.ReusedInvocation(call: Call[Params, Result], parent_invocation: DistributedInvocation | None, _invocation_id: str | None = None, diff_arg: Arguments | None = None)#
Bases:
DistributedInvocationThis is an invocation referencing an older one
- classmethod from_existing(invocation: DistributedInvocation, diff_arg: Arguments | None = None) ReusedInvocation#
Status#
- class pynenc.invocation.status.InvocationStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
StrEnumAn enumeration representing the status of a task invocation.
The PENDING status will expire after the time specified in
max_pending_seconds.Notes#
FAILED, RETRY, and SUCCESS are the same category and can be considered as subtypes of a hypothetical TERMINATED status.
- FAILED = 'failed'#
The task call finished with exceptions.
- PAUSED = 'paused'#
The task call execution is paused.
- PENDING = 'pending'#
The task call was picked by a runner but is not yet executed. The status pending will expire after Config.max_pending_seconds
- REGISTERED = 'registered'#
The task call has been routed and is registered
- REROUTED = 'rerouted'#
- RETRY = 'retry'#
The task call finished with a retriable exception.
- RUNNING = 'running'#
The task call is currently running.
- SCHEDULED = 'scheduled'#
A task has been registered to run at a specific time. This is a subtype of REGISTERED.
- SUCCESS = 'success'#
The task call finished without errors.
- is_available_for_run() bool#
Check if the task is in a state where it is available to be picked up and run by any broker. This means the task is not currently being executed, paused, or about to be executed by any other broker.
Returns#
- bool
True if the status is runnable by any broker, False otherwise.
Sync Invocation#
- class pynenc.invocation.sync_invocation.SynchronousInvocation(call: Call[Params, Result])#
Bases:
BaseInvocation[Params,Result]- classmethod from_json(app: Pynenc, serialized: str) SynchronousInvocation#
Returns a new invocation from a serialized invocation
- property num_retries: int#
Get the number of times the invocation got retried
- property result: Result#
- property status: InvocationStatus#
Get the status of the invocation
- to_json() str#
Returns a string with the serialized invocation
- class pynenc.invocation.sync_invocation.SynchronousInvocationGroup(task: Task, invocations: list[T])#
Bases:
BaseInvocationGroup[Params,Result,SynchronousInvocation]- property results: Iterator[Result]#
… (Previous content)
Orchestrator Submodules#
Base Orchestrator#
- class pynenc.orchestrator.base_orchestrator.BaseBlockingControl#
Bases:
ABCSub component of the orchestrator to implement blocking control functionalities
- abstract get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation]#
Returns an iterator of invocations that are blocking other invocations but are not getting blocked by any invocation. order by age, the oldest invocation first.
- abstract release_waiters(waited: DistributedInvocation) None#
Called when an invocation is finished and therefore cannot block other invocations anymore
- abstract waiting_for_results(caller_invocation: DistributedInvocation[Params, Result], result_invocations: list[DistributedInvocation[Params, Result]]) None#
Called when an Optional[invocation] is waiting in the result result of another invocation.
- class pynenc.orchestrator.base_orchestrator.BaseCycleControl#
Bases:
ABCSub component of the orchestrator to implement cycle control functionalities
- abstract add_call_and_check_cycles(caller_invocation: DistributedInvocation[Params, Result], callee_invocation: DistributedInvocation[Params, Result]) None#
Adds a new call between invocations and raise an exception to prevent the formation of a call cycle
- abstract clean_up_invocation_cycles(invocation: DistributedInvocation) None#
Called when an invocation is finished and therefore cannot be part of a cycle anymore
- class pynenc.orchestrator.base_orchestrator.BaseOrchestrator(app: Pynenc)#
Bases:
ABC- add_call_and_check_cycles(caller_invocation: DistributedInvocation[Params, Result], callee_invocation: DistributedInvocation[Params, Result]) None#
Adds a new call between invocations and raise an exception to prevent the formation of a call cycle
- abstract auto_purge() None#
Purge all invocations in final state that are older than app.conf.orchestrator_auto_final_invocation_purge_hours
- abstract property blocking_control: BaseBlockingControl#
- clean_up_invocation_cycles(invocation: DistributedInvocation) None#
Called when an invocation is finished and therefore cannot be part of a cycle anymore
- property conf: ConfigOrchestrator#
- abstract property cycle_control: BaseCycleControl#
- get_additional_invocations_to_run(missing_invocations: int, blocking_invocation_ids: set[str], invocations_to_reroute: set[DistributedInvocation]) Iterator[DistributedInvocation]#
- get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation]#
Returns an iterator of invocations that are blocking other invocations but are not getting blocked by any invocation. order by age, the oldest invocation first.
- get_blocking_invocations_to_run(max_num_invocations: int, blocking_invocation_ids: set[str]) Iterator[DistributedInvocation]#
- abstract get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, status: InvocationStatus | None = None) Iterator[DistributedInvocation]#
- abstract get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int#
- abstract get_invocation_status(invocation: DistributedInvocation[Params, Result]) InvocationStatus#
- get_invocations_to_run(max_num_invocations: int) Iterator[DistributedInvocation]#
- abstract increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None#
- is_authorize_to_run_by_concurrency_control(invocation: DistributedInvocation) bool#
Checks if the invocation can run based on task concurrency configuration
- abstract purge() None#
- release_waiters(waited: DistributedInvocation) None#
Called when an invocation is finished and therefore cannot block other invocations anymore
- reroute_invocations(invocations_to_reroute: set[DistributedInvocation]) None#
- route_call(call: Call) DistributedInvocation[Params, Result]#
Routes a task call in a distributed task system based on single invocation options.
If the task has no single invocation option set, it routes a new call invocation. For tasks with single invocation, it checks if an existing invocation with the same or different arguments exists. If an invocation with the same arguments exists, it reuses the invocation. If an invocation with different arguments exists, it raises an error or reuses the invocation based on the ‘on_diff_args_raise’ flag.
- Parameters:
call (Call) – The task call to be routed.
- Returns:
The resulting DistributedInvocation object, which could be a new or reused invocation.
- Return type:
DistributedInvocation[Params, Result]
- set_invocation_exception(invocation: DistributedInvocation, exception: Exception) None#
Called when an invocation is finished with an exception
- set_invocation_result(invocation: DistributedInvocation, result: Any) None#
Called when an invocation is finished successfully
- set_invocation_retry(invocation: DistributedInvocation, exception: Exception) None#
Called when an invocation is finished with an exception and should be retried
- set_invocation_run(caller: DistributedInvocation[Params, Result] | None, callee: DistributedInvocation[Params, Result]) None#
Called when an invocation is started
- set_invocation_status(invocation: DistributedInvocation[Params, Result], status: InvocationStatus) None#
- set_invocations_status(invocations: list[DistributedInvocation[Params, Result]], status: InvocationStatus) None#
- abstract set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None#
set up the invocation to be auto purgue after app.conf.orchestrator_auto_final_invocation_purge_hours
- waiting_for_results(caller_invocation: DistributedInvocation[Params, Result] | None, result_invocations: list[DistributedInvocation[Params, Result]]) None#
Called when an Optional[invocation] is waiting in the result result of another invocation.
Memory Orchestrator#
- class pynenc.orchestrator.mem_orchestrator.ArgPair(key: str, value: Any)#
Bases:
objectHelper to simulate a Memory cache for key:value pairs in Task Invocations
- class pynenc.orchestrator.mem_orchestrator.MemBlockingControl(app: Pynenc)#
Bases:
BaseBlockingControlA directed acyclic graph representing the call dependencies
- get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation[Params, Result]]#
Returns the invocations that are blocking others but not waiting for anything themselves. The oldest invocations are returned first.
Parameters#
- max_num_invocationsint
The maximum number of invocations to return.
Returns#
- Set[DistributedInvocation] | None
A set of blocking invocations or None if no invocations are blocking.
- release_waiters(invocation: DistributedInvocation) None#
Remove an invocation from the graph. Also removes any edges to or from the invocation.
- waiting_for_results(caller_invocation: DistributedInvocation[Params, Result], result_invocations: list[DistributedInvocation[Params, Result]]) None#
Register that an invocation (waiter) is waiting for the results of another invocation (waited).
- class pynenc.orchestrator.mem_orchestrator.MemCycleControl(app: Pynenc)#
Bases:
BaseCycleControlA directed acyclic graph representing the call dependencies
- add_call_and_check_cycles(caller: DistributedInvocation, callee: DistributedInvocation) None#
Add a new invocation to the graph. This represents a dependency where the caller is dependent on the callee.
Raises a CycleDetectedError if the invocation would cause a cycle.
- clean_up_invocation_cycles(invocation: DistributedInvocation) None#
Remove an invocation from the graph. Also removes any edges to or from the invocation.
- find_cycle_caused_by_new_invocation(caller: DistributedInvocation, callee: DistributedInvocation) list[Call]#
Determines if adding an edge from the caller to the callee would create a cycle.
Parameters#
- callerDistributedInvocation
The invocation making the call.
- calleeDistributedInvocation
The invocation being called.
Returns#
- list
List of invocations that would form the cycle after adding the new invocation, else an empty list.
- class pynenc.orchestrator.mem_orchestrator.MemOrchestrator(app: Pynenc)#
Bases:
BaseOrchestrator- auto_purge() None#
Purge all invocations in final state that are older than app.conf.orchestrator_auto_final_invocation_purge_hours
- property blocking_control: MemBlockingControl#
- property cycle_control: MemCycleControl#
- get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, status: InvocationStatus | None = None) Iterator[DistributedInvocation]#
- get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int#
- get_invocation_status(invocation: DistributedInvocation[Params, Result]) InvocationStatus#
- increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None#
- purge() None#
- set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None#
set up the invocation to be auto purgue after app.conf.orchestrator_auto_final_invocation_purge_hours
- class pynenc.orchestrator.mem_orchestrator.TaskInvocationCache(app: Pynenc)#
Bases:
Generic[Result]- auto_purge() None#
- clean_pending_status(invocation: DistributedInvocation[Params, Result]) None#
- clean_up_invocation(invocation_id: str) None#
- get_invocations(key_arguments: dict[str, str] | None, status: InvocationStatus | None) Iterator[DistributedInvocation]#
- get_retries(invocation: DistributedInvocation[Params, Result]) int#
- get_status(invocation: DistributedInvocation[Params, Result]) InvocationStatus#
- increase_retries(invocation: DistributedInvocation[Params, Result]) None#
- set_pending_status(invocation: DistributedInvocation[Params, Result]) None#
- set_status(invocation: DistributedInvocation[Params, Result], status: InvocationStatus) None#
- set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None#
Redis Orchestrator#
- class pynenc.orchestrator.redis_orchestrator.RedisBlockingControl(app: Pynenc, client: Redis)#
Bases:
BaseBlockingControlA directed acyclic graph representing the call dependencies
- get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation[Params, Result]]#
Returns the invocations that are blocking others but not waiting for anything themselves. The oldest invocations are returned first.
Parameters#
- max_num_invocationsint
The maximum number of invocations to return.
Returns#
- list[DistributedInvocation]
A list of blocking invocations or an empty list if no invocations are blocking.
- purge() None#
- release_waiters(invocation: DistributedInvocation) None#
Remove an invocation from the graph. Also removes any edges to or from the invocation.
- waiting_for_results(waiter: DistributedInvocation, waiteds: list[DistributedInvocation]) None#
Register that an invocation (waiter) is waiting for the results of another invocation (waited).
- class pynenc.orchestrator.redis_orchestrator.RedisCycleControl(app: Pynenc, client: Redis)#
Bases:
BaseCycleControlA directed acyclic graph representing the call dependencies
- add_call_and_check_cycles(caller: DistributedInvocation, callee: DistributedInvocation) None#
Add a new invocation to the graph. This represents a dependency where the caller is dependent on the callee.
Raises a CycleDetectedError if the invocation would cause a cycle.
- clean_up_invocation_cycles(invocation: DistributedInvocation) None#
Remove an invocation from the graph. Also removes any edges to or from the invocation.
- find_cycle_caused_by_new_invocation(caller: DistributedInvocation, callee: DistributedInvocation) list[Call]#
Determines if adding an edge from the caller to the callee would create a cycle.
Parameters#
- callerDistributedInvocation
The invocation making the call.
- calleeDistributedInvocation
The invocation being called.
Returns#
- list
List of invocations that would form the cycle after adding the new invocation, else an empty list.
- purge() None#
- remove_edges(call_id: str) None#
Remove all edges from a call
- class pynenc.orchestrator.redis_orchestrator.RedisOrchestrator(app: Pynenc)#
Bases:
BaseOrchestrator- auto_purge() None#
Purge all invocations in final state that are older than app.conf.orchestrator_auto_final_invocation_purge_hours
- property blocking_control: RedisBlockingControl#
- property conf: ConfigOrchestratorRedis#
- property cycle_control: RedisCycleControl#
- get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, status: InvocationStatus | None = None) Iterator[DistributedInvocation]#
- get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int#
- get_invocation_status(invocation: DistributedInvocation[Params, Result]) InvocationStatus#
- increase_retries(invocation: DistributedInvocation[Params, Result]) None#
- increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None#
- purge() None#
Remove all invocations from the orchestrator
- set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None#
set up the invocation to be auto purgue after app.conf.orchestrator_auto_final_invocation_purge_hours
- exception pynenc.orchestrator.redis_orchestrator.StatusNotFound#
Bases:
ExceptionRaised when a status is not found in Redis
- class pynenc.orchestrator.redis_orchestrator.TaskRedisCache(app: Pynenc, client: Redis)#
Bases:
object- auto_purge() None#
- clean_pending_status(invocation: DistributedInvocation[Params, Result]) None#
- get_invocation_retries(invocation: DistributedInvocation) int#
- get_invocation_status(invocation: DistributedInvocation) InvocationStatus#
- get_invocations(task_id: str, key_arguments: dict[str, str] | None, status: InvocationStatus | None) Iterator[DistributedInvocation]#
- increment_invocation_retries(invocation: DistributedInvocation) None#
- purge() None#
- set_pending_status(invocation: DistributedInvocation[Params, Result]) None#
- set_status(invocation: DistributedInvocation[Params, Result], status: InvocationStatus) None#
- set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None#
Runner Submodules#
Base Runner#
- class pynenc.runner.base_runner.BaseRunner(app: Pynenc)#
Bases:
ABCThe Runner will execute invocations from the broker.
It requires an app because it needs to know about the broker, orchestrator, etc.
The runner will affect the behavior of the task result, for example: * In a subprocess environment, it may implement a pipe to communicate for pausing/resuming processes. * In an async environment, the value should be an async function to wait for distributed results. * In a cloud function environment aiming for speed with a single thread, it might not wait more than ‘x’ seconds and instead, create a ‘callback’, save the status, and convert the current execution into a task that will be called when the result is ready. * In a multiprocessing environment in a Kubernetes pod with capabilities to create new pods, it may have different behaviors. * For an asyncio worker, it runs several tasks in one processor, and the value should wait with async.
- property conf: ConfigRunner#
- abstract property max_parallel_slots: int#
The maximum number of parallel task that the runner can handle
- abstract static mem_compatible() bool#
Can this runner run with memory components?
- on_start() None#
This method is called when the runner starts
- on_stop() None#
This method is called when the runner stops
- run() None#
Starts the runner
- property runner_id: str#
- abstract runner_loop_iteration() None#
One iteration of the runner loop. Subclasses should implement this method to process invocations.
- stop_runner_loop(signum: int | None = None, frame: FrameType | None = None) None#
Stops the runner loop
- abstract waiting_for_results(running_invocation: DistributedInvocation | None, result_invocation: list[DistributedInvocation], runner_args: dict[str, Any] | None = None) None#
This method is called from the result method of an invocation It signals the runner that the running invocation is waiting for the result of the result invocation
The running invocation may be None, when the result was called from outside a runner (e.g. user environment) In that case will be handle by the DummyRunner (default in the pynenc app to handle this cases)
The runner has the oportunity to define the waiting behaviour of the running invocation in this method Otherwise the running invocation will infinetely loop until the result invocation is ready
runner_args is a dictionary with the arguments passed to the runner by itself e.g. process runner uses this to syncronize managed dictionaries among sub-process
- class pynenc.runner.base_runner.DummyRunner(app: Pynenc)#
Bases:
BaseRunnerThis runner is a placeholder for the Pynenc app. It will be used when the app is defined in any other Python environment than a Pynenc runner.
- Examples include:
A script that defines the app, decorates some tasks, routes them, and then finishes. Such a script does not plan to run anything itself but triggers tasks that will later run in actual runners.
- property max_parallel_slots: int#
The maximum number of parallel task that the runner can handle
- static mem_compatible() bool#
Can this runner run with memory components?
- runner_loop_iteration() None#
One iteration of the runner loop. Subclasses should implement this method to process invocations.
- waiting_for_results(running_invocation: DistributedInvocation | None, result_invocation: list[DistributedInvocation], runner_args: dict[str, Any] | None = None) None#
This method is called from the result method of an invocation It signals the runner that the running invocation is waiting for the result of the result invocation
The running invocation may be None, when the result was called from outside a runner (e.g. user environment) In that case will be handle by the DummyRunner (default in the pynenc app to handle this cases)
The runner has the oportunity to define the waiting behaviour of the running invocation in this method Otherwise the running invocation will infinetely loop until the result invocation is ready
runner_args is a dictionary with the arguments passed to the runner by itself e.g. process runner uses this to syncronize managed dictionaries among sub-process
Context (Runner)#
- class pynenc.runner.context.ApplicationContext#
Bases:
objecttodo
Memory Runner#
Process Runner#
- class pynenc.runner.process_runner.ProcessRunner(app: Pynenc)#
Bases:
BaseRunner- property available_processes: int#
- manager: Manager#
- property max_parallel_slots: int#
The maximum number of parallel task that the runner can handle
- max_processes: int#
- static mem_compatible() bool#
Can this runner run with memory components?
- parse_args(args: dict[str, Any]) None#
- processes: dict[DistributedInvocation, Process]#
- property runner_args: dict[str, Any]#
- runner_loop_iteration() None#
One iteration of the runner loop. Subclasses should implement this method to process invocations.
- wait_invocation: dict[DistributedInvocation, set[DistributedInvocation]]#
- waiting_for_results(running_invocation: DistributedInvocation | None, result_invocations: list[DistributedInvocation], runner_args: dict[str, Any] | None = None) None#
This method is called from the result method of an invocation It signals the runner that the running invocation is waiting for the result of the result invocation
The running invocation may be None, when the result was called from outside a runner (e.g. user environment) In that case will be handle by the DummyRunner (default in the pynenc app to handle this cases)
The runner has the oportunity to define the waiting behaviour of the running invocation in this method Otherwise the running invocation will infinetely loop until the result invocation is ready
runner_args is a dictionary with the arguments passed to the runner by itself e.g. process runner uses this to syncronize managed dictionaries among sub-process
- property waiting_processes: int#
Serializer Submodules#
Base Serializer#
JSON Serializer#
- class pynenc.serializer.json_serializer.DefaultJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)#
Bases:
JSONEncoder- default(obj: Any) Any#
Implement this method in a subclass such that it returns a serializable object for
o, or calls the base implementation (to raise aTypeError).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
- class pynenc.serializer.json_serializer.JsonSerializer#
Bases:
BaseSerializer- static deserialize(obj: str) Any#
- static serialize(obj: Any) str#
Pickle Serializer#
- class pynenc.serializer.pickle_serializer.PickleSerializer#
Bases:
BaseSerializer- static deserialize(serialized_obj: str) Any#
- static serialize(obj: Any) str#
State Backend Submodules#
Base State Backend#
- class pynenc.state_backend.base_state_backend.BaseStateBackend(app: Pynenc)#
Bases:
ABC- add_history(invocation: DistributedInvocation[Params, Result], status: InvocationStatus | None = None, execution_context: Any | None = None) None#
- property conf: ConfigStateBackend#
- get_exception(invocation: DistributedInvocation[Params, Result]) Exception#
- get_history(invocation: DistributedInvocation[Params, Result]) list[InvocationHistory]#
- get_invocation(invocation_id: str) DistributedInvocation#
- get_result(invocation: DistributedInvocation[Params, Result]) Result#
- abstract purge() None#
- set_exception(invocation: DistributedInvocation, exception: Exception) None#
- set_result(invocation: DistributedInvocation, result: Result) None#
- upsert_invocation(invocation: DistributedInvocation) None#
- wait_for_all_async_operations() None#
Blocks until all asynchronous status operations are finished.
- wait_for_invocation_async_operations(invocation_id: str) None#
Blocks until all asynchronous operations for a specific invocation are finished.
- class pynenc.state_backend.base_state_backend.InvocationHistory(invocation_id: str, status: Optional[ForwardRef('InvocationStatus')] = None, execution_context: Any | None = None)#
Bases:
object- execution_context: Any | None = None#
- classmethod from_json(json_str: str) InvocationHistory#
- invocation_id: str#
- status: InvocationStatus | None = None#
- property timestamp: datetime#
- to_json() str#
Memory State Backend#
- class pynenc.state_backend.mem_state_backend.MemStateBackend(app: Pynenc)#
Bases:
BaseStateBackend- purge() None#
Redis State Backend#
- class pynenc.state_backend.redis_state_backend.RedisStateBackend(app: Pynenc)#
Bases:
BaseStateBackend- property conf: ConfigStateBackendRedis#
- purge() None#
Util Submodules#
Files#
- pynenc.util.files.load_config_from_toml(file_path: str) dict#
- pynenc.util.files.load_file(filepath: str) dict[str, Any]#
Log#
- class pynenc.util.log.RunnerLogAdapter(logger: Logger, runner_id: str)#
Bases:
LoggerAdapter- process(msg: Any, kwargs: Any) Any#
Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.
Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.
- class pynenc.util.log.TaskLoggerAdapter(logger: Logger, task_id: str, invocation_id: str | None = None)#
Bases:
LoggerAdapter- process(msg: Any, kwargs: Any) Any#
Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.
Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.
- set_context(task_id: str, invocation_id: str | None) None#
Redis Keys#
- class pynenc.util.redis_keys.Key(app_id: str, prefix: str)#
Bases:
object- all_waited() str#
- args(task_id: str, arg: str, val: str) str#
- call(call_id: str) str#
- call_to_invocation(call_id: str) str#
- default_queue() str#
- edge(call_id: str) str#
- exception(invocation_id: str) str#
- history(invocation_id: str) str#
- invocation(invocation_id: str) str#
- invocation_auto_purge() str#
- invocation_retries(invocation_id: str) str#
- invocation_status(invocation_id: str) str#
- not_waiting() str#
- pending_timer(invocation_id: str) str#
- previous_status(invocation_id: str) str#
- purge(client: Redis) None#
Purge all keys with the given prefix
- result(invocation_id: str) str#
- status(task_id: str, status: InvocationStatus) str#
- task(task_id: str) str#
- waited_by(invocation_id: str) str#
- waiting_for(invocation_id: str) str#
- pynenc.util.redis_keys.sanitize_for_redis(s: str) str#
Subclasses#
- pynenc.util.subclasses.get_all_subclasses(cls: type[T]) list[type[T]]#
- pynenc.util.subclasses.get_subclass(root_class: type[T], child_class_name: str) type[T]#
Returns the subclass with the given name (any level deep)