pynenc.orchestrator.base_orchestrator¶
Module Contents¶
Classes¶
A component of the orchestrator to implement blocking control functionalities. |
|
Abstract base class defining the orchestrator’s interface in a distributed task system. |
API¶
- class pynenc.orchestrator.base_orchestrator.BaseBlockingControl[source]¶
Bases:
abc.ABCA component of the orchestrator to implement blocking control functionalities.
This abstract base class defines the interface for managing blocking behavior in distributed task executions.
- abstractmethod release_waiters(waited_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Releases any invocations that are waiting on the specified invocation.
- Parameters:
waited_invocation_id (InvocationId) – The ID of the invocation that has finished and can release its waiters.
- abstractmethod waiting_for_results(caller_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]¶
Notifies the system that an invocation is waiting for the results of other invocations.
- Parameters:
caller_invocation_id (InvocationId) – The ID of the invocation that is waiting.
result_invocation_ids (list[InvocationId]) – The IDs of the invocations being waited on.
- abstractmethod get_blocking_invocations(max_num_invocations: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs that are blocking others but are not blocked themselves.
- Parameters:
max_num_invocations (int) – The maximum number of blocking invocation IDs to retrieve.
- Returns:
An iterator over unblocked, blocking invocation IDs, ordered by age (oldest first).
- Return type:
Iterator[InvocationId]
- class pynenc.orchestrator.base_orchestrator.BaseOrchestrator(app: pynenc.app.Pynenc)[source]¶
Bases:
abc.ABCAbstract base class defining the orchestrator’s interface in a distributed task system.
The orchestrator is responsible for managing task invocations, including tracking their status, handling retries, and implementing blocking control.
… note:: Coupling with StateBackend
The orchestrator relies on
app.state_backendfor persisting invocation data, results, exceptions, and history. Methods such asset_invocation_status,set_invocation_result, androute_calldelegate storage to the state backend. This coupling is intentional: the orchestrator owns lifecycle transitions while the state backend owns persistence. Future refactors may introduce an explicit interface to formalize this contract.- Parameters:
app (Pynenc) – The Pynenc application instance.
Initialization
- abstractmethod _register_new_invocations(invocations: list[DistributedInvocation[Params, Result]], runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]¶
Register new invocations with status Register if they don’t exist yet.
- Parameters:
invocations (list[DistributedInvocation[Params, Result]]) – The invocations to be registered.
runner_id (str | None) – The owner ID for ownership validation
- Returns:
The status record of the registered invocation.
- abstractmethod get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, statuses: list[InvocationStatus] | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves existing invocation IDs based on task, arguments, and status.
- Parameters:
- Returns:
An iterator over the matching invocation IDs.
- Return type:
Iterator[InvocationId]
- abstractmethod get_task_invocation_ids(task_id: pynenc.task.TaskId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves all invocation IDs associated with a specific task ID.
- Parameters:
task_id (TaskId) – The task ID to filter invocations.
- Returns:
List of invocation IDs for the specified task.
- abstractmethod get_invocation_ids_paginated(task_id: TaskId | None = None, statuses: list[InvocationStatus] | None = None, limit: int = 100, offset: int = 0) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs with pagination support.
This method provides efficient pagination for large datasets by using LIMIT/OFFSET semantics. Results are ordered by registration time (newest first).
- Parameters:
task_id (TaskId | None) – Optional task ID to filter by.
statuses (list[InvocationStatus] | None) – Optional statuses to filter by.
limit (int) – Maximum number of results to return.
offset (int) – Number of results to skip.
- Returns:
List of matching invocation IDs.
- abstractmethod count_invocations(task_id: TaskId | None = None, statuses: list[InvocationStatus] | None = None) int[source]¶
Counts invocations matching the given filters.
- Parameters:
task_id (str | None) – Optional task ID to filter by.
statuses (list[InvocationStatus] | None) – Optional statuses to filter by.
- Returns:
The total count of matching invocations.
- abstractmethod get_call_invocation_ids(call_id: pynenc.call.CallId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves all invocation IDs associated with a specific call ID.
- Parameters:
call_id (CallId) – The call ID to filter invocations.
- Returns:
List of invocation IDs for the specified call.
- abstractmethod _atomic_status_transition(invocation_id: pynenc.identifiers.invocation_id.InvocationId, status: pynenc.invocation.status.InvocationStatus, runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]¶
Atomically validates and transitions invocation status.
Backend implementations must:
Read current status record
Validate transition using status_record_transition()
Atomically update only if validation passes
Return the new status record
All validation and state changes happen within a single atomic operation.
- Parameters:
invocation_id (InvocationId) – The ID of the invocation to update
status (InvocationStatus) – The target status
runner_id (str | None) – The owner ID for ownership validation
- Returns:
The new status record after successful transition
- Return type:
- Raises:
InvocationStatusTransitionError – If transition is not allowed
InvocationStatusOwnershipError – If ownership rules are violated
KeyError – If invocation does not exist
- abstractmethod index_arguments_for_concurrency_control(invocation: DistributedInvocation[Params, Result]) None[source]¶
Caches the required data to implement concurrency control.
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation to be cached.
- abstractmethod set_up_invocation_auto_purge(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Sets up automatic purging for an invocation after a defined period.
Note
Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours`
- Parameters:
invocation_id (InvocationId) – The invocation to set up for auto purge.
- abstractmethod auto_purge() None[source]¶
Automatically purges all invocations in a final state that are older than a defined time period.
Note
Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours`
- get_invocation_status(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.status.InvocationStatus[source]¶
Retrieves the status of a specific invocation id.
- Parameters:
invocation_id (InvocationId) – The id of the invocation whose status is to be retrieved.
- Returns:
The current status of the invocation.
- Return type:
- Raises:
KeyError – If the invocation ID does not exist.
- abstractmethod get_invocation_status_record(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.status.InvocationStatusRecord[source]¶
Retrieves the status record of a specific invocation id.
- Parameters:
invocation_id (InvocationId) – The id of the invocation whose status is to be retrieved.
- Returns:
The current status record of the invocation.
- Return type:
- Raises:
KeyError – If the invocation ID does not exist.
- abstractmethod increment_invocation_retries(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Increments the retry count of a specific invocation.
- Parameters:
invocation_id (InvocationId) – The id of the invocation for which to increment retries.
- abstractmethod get_invocation_retries(invocation_id: pynenc.identifiers.invocation_id.InvocationId) int[source]¶
Retrieves the number of retries for a specific invocation.
- Parameters:
invocation_id (InvocationId) – The id of the invocation whose retry count is to be retrieved.
- Returns:
The number of retries for the invocation.
- Return type:
- abstractmethod filter_by_status(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], status_filter: frozenset[pynenc.invocation.status.InvocationStatus]) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Filters a list of invocation ids by their status in an optimized way.
This method allows efficient batch filtering of invocations by status, reducing the number of individual status checks needed.
- Parameters:
invocations (list[InvocationId]) – The invocation ids to filter
status_filter (list[InvocationStatus] | None) – The statuses to filter by. If None, defaults to final statuses.
- Returns:
List of invocation ids matching the status filter
- Return type:
list[InvocationId]
- filter_final(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Returns invocation ids that have reached a final status.
- abstractmethod purge() None[source]¶
Purges all the orchestrator data for the current self.app.app_id.
Important
This should only be used for testing purposes.
- abstract property blocking_control: pynenc.orchestrator.base_orchestrator.BaseBlockingControl¶
Property to access the blocking control component of the orchestrator.
- Returns:
The blocking control component.
- Return type:
- release_waiters(waited_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Releases other invocations that are waiting on the completion of the specified invocation.
- Parameters:
waited_id (InvocationId) – The ID of the invocation that has completed.
- waiting_for_results(caller_invocation_id: InvocationId | None, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]¶
Notifies the system that an invocation is waiting for the results of other invocations.
- Parameters:
caller_invocation_id (InvocationId | None) – The ID of the waiting invocation.
result_invocation_ids (list[InvocationId]) – The IDs of the invocations being waited on.
- get_blocking_invocations(max_num_invocation_ids: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs that are blocking others but are not blocked themselves.
- Parameters:
max_num_invocation_ids (int) – The maximum number of blocking invocation IDs to retrieve.
- Returns:
An iterator over unblocked, blocking invocation IDs.
- Return type:
Iterator[InvocationId]
Note
The order of the returned invocation IDs is **oldest first**.
- set_invocation_status(invocation_id: pynenc.identifiers.invocation_id.InvocationId, status: pynenc.invocation.status.InvocationStatus, runner_ctx: pynenc.runner.runner_context.RunnerContext) None[source]¶
Sets the status of a specific invocation.
- Parameters:
invocation_id (InvocationId) – The ID of the invocation to update.
status (InvocationStatus) – The new status to set for the invocation.
- Raises:
InvocationStatusTransitionError – If transition is not allowed
InvocationStatusOwnershipError – If ownership rules are violated
InvocationStatusRaceConditionError – If a race condition is detected during status update
- set_invocation_result(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, result: Any, runner_ctx: pynenc.runner.runner_context.RunnerContext) None[source]¶
Sets the result for a completed invocation.
- Parameters:
invocation (DistributedInvocation) – The invocation that has completed.
result (Any) – The result of the invocation’s execution.
- set_invocation_exception(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, exception: Exception, runner_ctx: pynenc.runner.runner_context.RunnerContext) None[source]¶
Sets an exception for an invocation that finished with an error.
- Parameters:
invocation (DistributedInvocation) – The invocation that encountered an exception.
exception (Exception) – The exception that occurred during the invocation’s execution.
- set_invocation_retry(invocation_id: pynenc.identifiers.invocation_id.InvocationId, exception: Exception, runner_ctx: pynenc.runner.runner_context.RunnerContext) None[source]¶
Sets an invocation for retry in case of a retriable exception.
- Parameters:
invocation_id (InvocationId) – The ID of the invocation to be retried.
exception (Exception) – The exception that triggered the retry.
- is_candidate_to_run_by_concurrency_control(invocation: DistributedInvocation[Params, Result]) bool[source]¶
Checks if an invocation can be candidate to run based on the task’s concurrency control configuration.
- Parameters:
invocation (DistributedInvocation) – The invocation to check for authorization.
- Returns:
True if the invocation is authorized to be a running candidate, False otherwise.
- is_authorize_to_run_by_concurrency_control(invocation: DistributedInvocation[Params, Result]) bool[source]¶
Checks if an invocation can be candidate to run based on the task’s concurrency control configuration.
- Parameters:
invocation (DistributedInvocation) – The invocation to check for authorization.
- Returns:
True if the invocation is authorized to be a running candidate, False otherwise.
- _is_authorize_by_concurrency_control(invocation: DistributedInvocation[Params, Result], statuses: list[pynenc.invocation.status.InvocationStatus]) bool[source]¶
Checks if an invocation is authorized to run based on the task's concurrency control configuration. ```{important} The authorization is determined by the task's running_concurrency setting: - If ConcurrencyControlType.DISABLED, the invocation is always authorized to run. - If ConcurrencyControlType.TASK, it checks if there are any other running invocations of the same task. If there are, the invocation is not authorized to run. - If ConcurrencyControlType.ARGUMENTS, it checks for any running invocation of the same task with the same arguments. If there are, the invocation is not authorized to run. - If ConcurrencyControlType.KEYS, it checks for any running invocation with the same (key) arguments. If any are found, the invocation is not authorized to run. ``` ```{note} The function call.serialized_args_for_concurrency_control is used to determine the arguments that are relevant for checking existing running invocations based on the task's running_concurrency option. ``` :param DistributedInvocation invocation: The invocation to check for authorization. :param list[InvocationStatus] statuses: The statuses to check for existing invocations.- Returns:
True if the invocation is authorized, False otherwise.
- get_blocking_invocations_to_run(max_num_invocations: int, blocking_invocation_ids: set[pynenc.identifiers.invocation_id.InvocationId], runner_ctx: pynenc.runner.runner_context.RunnerContext) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs that are blocking others but are not themselves blocked, up to a maximum number.
- Parameters:
- Returns:
An iterator over the blocking invocation IDs.
- Return type:
Iterator[“InvocationId”]
- get_additional_invocations_to_run(missing_invocations: int, blocking_invocation_ids: set[pynenc.identifiers.invocation_id.InvocationId], invocations_to_reroute: set[pynenc.identifiers.invocation_id.InvocationId], runner_ctx: pynenc.runner.runner_context.RunnerContext) collections.abc.Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves additional invocations to run, considering those not blocked or already identified as blocking.
- Parameters:
- Returns:
An iterator over the additional invocations to run.
- Return type:
Iterator[“DistributedInvocation”]
- reroute_invocations(invocations_to_reroute: set[pynenc.identifiers.invocation_id.InvocationId], runner_ctx: pynenc.runner.runner_context.RunnerContext) None[source]¶
Reroutes the specified invocations , typically when they are not authorized to run.
- Parameters:
invocations_to_reroute (set[“InvocationId”]) – The invocations to be rerouted.
- get_invocations_to_run(max_num_invocations: int, runner_ctx: pynenc.runner.runner_context.RunnerContext) collections.abc.Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves a set of invocations to run, considering blocking and concurrency control.
- Parameters:
max_num_invocations (int) – The maximum number of invocations to retrieve.
- Returns:
An iterator over invocations that are ready to run.
- Return type:
Iterator[“DistributedInvocation”]
- register_new_invocations(invocations: list[DistributedInvocation[Params, Result]]) None[source]¶
Registers a new invocation in the state backend and the orchestrator.
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation to register.
runner_id (str | None) – The owner ID for ownership validation
- _route_new_call_invocation(call: Call[Params, Result], runner_id: str | None = None) DistributedInvocation[Params, Result][source]¶
Routes a new call invocation within the distributed task system.
This method creates and routes a new
DistributedInvocationfor the provided call. It is primarily used when the task does not have single invocation options set.- Parameters:
call (Call[Params, Result]) – The task call to be routed.
- Returns:
The newly created
DistributedInvocationfor the call.- Return type:
DistributedInvocation[Params, Result]
- route_call(call: pynenc.call.Call) pynenc.invocation.dist_invocation.DistributedInvocation[source]¶
Routes a task call in the distributed task system, considering single invocation options.
This method handles the routing of a task call.
Important
Note the different behavior depending on the task's registration_concurrency option. A task is Registered when it is created but not yet Running, preventing over-queueing: - If ConcurrencyControlType.DISABLED, It always creates a new invocation. - If ConcurrencyControlType.TASK, It checks for any existing invocation of the same task regardless the arguments. If any is found, it reuses it, otherwise it creates a new invocation. - If ConcurrencyControlType.ARGUMENTS, It checks for any existing invocation with the same arguments. If any is found, it reuses it, otherwise it creates a new invocation. - If ConcurrencyControlType.KEYS, It checks for any existing invocation with the same key arguments. If any is found and the non-key arguments are the same, it always reuses it, IF the non-key arguments are differents and on_diff_non_key_args_raise is set to True, it raises an error, otherwise it reuses it with the new non-key arguments. If no invocation is found, it creates a new invocation.Note
The function call.serialized_args_for_concurrency_control is used to get the arguments that are used to check for existing invocations based on the task's registration_concurrency option.
- Parameters:
call (Call) – The task call to be routed.
- Returns:
A
DistributedInvocationobject, which could be either a new or reused invocation.- Return type:
- Raises:
InvocationConcurrencyWithDifferentArgumentsError – If an invocation with different arguments exists and the task’s configuration specifies to raise an error in such cases.
- route_calls(calls: list[PreSerializedCall[Params, Result]]) list[DistributedInvocation[Params, Result]][source]¶
Routes multiple calls at once for improved performance.
This method is specifically for batch processing tasks with disabled concurrency control.
- Parameters:
calls (list[PreSerializedCall[Params, Result]]) – The calls to be routed.
- Returns:
The list of routed invocations.
- Return type:
list[DistributedInvocation[Params, Result]]
- Raises:
TaskParallelProcessingError – If concurrency control is enabled for any of the calls.
- abstractmethod register_runner_heartbeats(runner_ids: list[str], can_run_atomic_service: bool = False) None[source]¶
Register or update heartbeat timestamps for one or more runners.
This unified method handles both:
Parent/atomic service runners registering themselves (can_run_atomic_service=True)
Worker runners registering themselves (can_run_atomic_service=False, default)
Parent runners reporting their children’s heartbeats (can_run_atomic_service=False)
For runners that already exist, only updates the heartbeat timestamp. For new runners, creates a new record with the given atomic service eligibility.
- abstractmethod _get_active_runners(timeout_seconds: float, can_run_atomic_service: bool | None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]¶
Retrieve runners that are considered active based on heartbeat activity.
A runner is considered “active” if it has sent a heartbeat within the timeout period. This is used for atomic service scheduling to determine which runners are eligible to participate in time slot distribution.
- Parameters:
- Returns:
List of active runners ordered by creation time (oldest first)
- Return type:
list[“ActiveRunnerInfo”]
- get_active_runners(can_run_atomic_service: bool | None = None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]¶
Retrieve runners that are considered active based on heartbeat activity.
A runner is considered “active” if it has sent a heartbeat within the timeout period. This is used for atomic service scheduling to determine which runners are eligible to participate in time slot distribution.
- abstractmethod get_pending_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs stuck in PENDING status beyond the allowed time.
- Returns:
Iterator of invocation IDs that need recovery.
- Return type:
Iterator[“InvocationId”]
- abstractmethod _get_running_invocations_for_recovery(timeout_seconds: float) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs in RUNNING status owned by inactive runners.
An inactive runner is one that hasn’t sent a heartbeat within the configured timeout period. Invocations owned by such runners are considered stuck and need recovery.
- Parameters:
timeout_seconds (float) – Heartbeat timeout in seconds
- Returns:
Iterator of invocation IDs that need recovery.
- Return type:
Iterator[“InvocationId”]
- get_running_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs in RUNNING status owned by inactive runners.
An inactive runner is one that hasn’t sent a heartbeat within the configured timeout period. Invocations owned by such runners are considered stuck and need recovery.
- Parameters:
timeout_seconds (float) – Heartbeat timeout in seconds
- Returns:
Iterator of invocation IDs that need recovery.
- Return type:
Iterator[“InvocationId”]
- should_run_atomic_service(runner_ctx: pynenc.runner.runner_context.RunnerContext) bool[source]¶
Determine if the current runner should execute atomic global services.
This method has a side effect: it registers a heartbeat for the runner with can_run_atomic_service=True. This ensures that only runners actively checking for atomic service eligibility are considered for atomic service distribution.
Uses runner count and timing to distribute service execution across runners, preventing simultaneous execution and race conditions.
- Parameters:
runner_ctx (RunnerContext) – The context of the current runner.
- Returns:
True if this runner should execute services now.
- Return type:
- abstractmethod record_atomic_service_execution(runner_id: str, start_time: datetime.datetime, end_time: datetime.datetime) None[source]¶
Record the latest atomic service execution window for a runner.
Replaces any previous execution record for this runner with the current one. Used for diagnostics and detecting potential collisions.
- Parameters:
runner_id (str) – The runner that executed the service
start_time (datetime) – When execution started (UTC timezone-aware)
end_time (datetime) – When execution ended (UTC timezone-aware)