pynenc.orchestrator.base_orchestrator¶
Module Contents¶
Classes¶
A component of the orchestrator to implement blocking control functionalities. |
|
Abstract base class defining the orchestrator’s interface in a distributed task system. |
API¶
- class pynenc.orchestrator.base_orchestrator.BaseBlockingControl[source]¶
Bases:
abc.ABCA component of the orchestrator to implement blocking control functionalities.
This abstract base class defines the interface for managing blocking behavior in distributed task executions.
- abstractmethod release_waiters(waited_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Releases any invocations that are waiting on the specified invocation.
- Parameters:
waited_invocation_id (InvocationId) – The ID of the invocation that has finished and can release its waiters.
- abstractmethod waiting_for_results(caller_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]¶
Notifies the system that an invocation is waiting for the results of other invocations.
- Parameters:
caller_invocation_id (InvocationId) – The ID of the invocation that is waiting.
result_invocation_ids (list[InvocationId]) – The IDs of the invocations being waited on.
- abstractmethod get_blocking_invocations(max_num_invocations: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs that are blocking others but are not blocked themselves.
- Parameters:
max_num_invocations (int) – The maximum number of blocking invocation IDs to retrieve.
- Returns:
An iterator over unblocked, blocking invocation IDs, ordered by age (oldest first).
- Return type:
Iterator[InvocationId]
- class pynenc.orchestrator.base_orchestrator.BaseOrchestrator(app: pynenc.app.Pynenc)[source]¶
Bases:
abc.ABCAbstract base class defining the orchestrator’s interface in a distributed task system.
The orchestrator is responsible for managing task invocations, including tracking their status, handling retries, and implementing blocking control.
… note:: Coupling with StateBackend
The orchestrator relies on
app.state_backendfor persisting invocation data, results, exceptions, and history. Methods such asset_invocation_status,set_invocation_result, androute_calldelegate storage to the state backend. This coupling is intentional: the orchestrator owns lifecycle transitions while the state backend owns persistence. Future refactors may introduce an explicit interface to formalize this contract.- Parameters:
app (Pynenc) – The Pynenc application instance.
Initialization
- abstractmethod _register_new_invocations(invocations: list[DistributedInvocation[Params, Result]], runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]¶
Register new invocations with status Register if they don’t exist yet.
- Parameters:
invocations (list[DistributedInvocation[Params, Result]]) – The invocations to be registered.
runner_id (str | None) – The owner ID for ownership validation
- Returns:
The status record of the registered invocation.
- abstractmethod get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, statuses: list[InvocationStatus] | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves existing invocation IDs based on task, arguments, and status.
- Parameters:
- Returns:
An iterator over the matching invocation IDs.
- Return type:
Iterator[InvocationId]
- abstractmethod get_task_invocation_ids(task_id: pynenc.task.TaskId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves all invocation IDs associated with a specific task ID.
- Parameters:
task_id (TaskId) – The task ID to filter invocations.
- Returns:
List of invocation IDs for the specified task.
- abstractmethod get_invocation_ids_paginated(task_id: TaskId | None = None, statuses: list[InvocationStatus] | None = None, limit: int = 100, offset: int = 0) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs with pagination support.
This method provides efficient pagination for large datasets by using LIMIT/OFFSET semantics. Results are ordered by registration time (newest first).
- Parameters:
task_id (TaskId | None) – Optional task ID to filter by.
statuses (list[InvocationStatus] | None) – Optional statuses to filter by.
limit (int) – Maximum number of results to return.
offset (int) – Number of results to skip.
- Returns:
List of matching invocation IDs.
- abstractmethod count_invocations(task_id: TaskId | None = None, statuses: list[InvocationStatus] | None = None) int[source]¶
Counts invocations matching the given filters.
- Parameters:
task_id (str | None) – Optional task ID to filter by.
statuses (list[InvocationStatus] | None) – Optional statuses to filter by.
- Returns:
The total count of matching invocations.
- abstractmethod get_call_invocation_ids(call_id: pynenc.call.CallId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves all invocation IDs associated with a specific call ID.
- Parameters:
call_id (CallId) – The call ID to filter invocations.
- Returns:
List of invocation IDs for the specified call.
- abstractmethod _atomic_status_transition(invocation_id: pynenc.identifiers.invocation_id.InvocationId, status: pynenc.invocation.status.InvocationStatus, runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]¶
Atomically validates and transitions invocation status.
Backend implementations must:
Read current status record
Validate transition using status_record_transition()
Atomically update only if validation passes
Return the new status record
All validation and state changes happen within a single atomic operation.
- Parameters:
invocation_id (InvocationId) – The ID of the invocation to update
status (InvocationStatus) – The target status
runner_id (str | None) – The owner ID for ownership validation
- Returns:
The new status record after successful transition
- Return type:
- Raises:
InvocationStatusTransitionError – If transition is not allowed
InvocationStatusOwnershipError – If ownership rules are violated
KeyError – If invocation does not exist
- abstractmethod index_arguments_for_concurrency_control(invocation: DistributedInvocation[Params, Result]) None[source]¶
Caches the required data to implement concurrency control.
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation to be cached.
- abstractmethod set_up_invocation_auto_purge(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Sets up automatic purging for an invocation after a defined period.
Note
Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours`
- Parameters:
invocation_id (InvocationId) – The invocation to set up for auto purge.
- abstractmethod auto_purge() None[source]¶
Automatically purges all invocations in a final state that are older than a defined time period.
Note
Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours`
- get_invocation_status(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.status.InvocationStatus[source]¶
Retrieves the status of a specific invocation id.
- Parameters:
invocation_id (InvocationId) – The id of the invocation whose status is to be retrieved.
- Returns:
The current status of the invocation.
- Return type:
- Raises:
KeyError – If the invocation ID does not exist.
- abstractmethod get_invocation_status_record(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.status.InvocationStatusRecord[source]¶
Retrieves the status record of a specific invocation id.
- Parameters:
invocation_id (InvocationId) – The id of the invocation whose status is to be retrieved.
- Returns:
The current status record of the invocation.
- Return type:
- Raises:
KeyError – If the invocation ID does not exist.
- abstractmethod increment_invocation_retries(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Increments the retry count of a specific invocation.
- Parameters:
invocation_id (InvocationId) – The id of the invocation for which to increment retries.
- abstractmethod get_invocation_retries(invocation_id: pynenc.identifiers.invocation_id.InvocationId) int[source]¶
Retrieves the number of retries for a specific invocation.
- Parameters:
invocation_id (InvocationId) – The id of the invocation whose retry count is to be retrieved.
- Returns:
The number of retries for the invocation.
- Return type:
- abstractmethod filter_by_status(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], status_filter: frozenset[pynenc.invocation.status.InvocationStatus]) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Filters a list of invocation ids by their status in an optimized way.
This method allows efficient batch filtering of invocations by status, reducing the number of individual status checks needed.
- Parameters:
invocations (list[InvocationId]) – The invocation ids to filter
status_filter (list[InvocationStatus] | None) – The statuses to filter by. If None, defaults to final statuses.
- Returns:
List of invocation ids matching the status filter
- Return type:
list[InvocationId]
- filter_final(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Returns invocation ids that have reached a final status.
- abstractmethod purge() None[source]¶
Purges all the orchestrator data for the current self.app.app_id.
Important
This should only be used for testing purposes.
- abstract property blocking_control: pynenc.orchestrator.base_orchestrator.BaseBlockingControl¶
Property to access the blocking control component of the orchestrator.
- Returns:
The blocking control component.
- Return type:
- release_waiters(waited_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Releases other invocations that are waiting on the completion of the specified invocation.
- Parameters:
waited_id (InvocationId) – The ID of the invocation that has completed.
- waiting_for_results(caller_invocation_id: InvocationId | None, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]¶
Notifies the system that an invocation is waiting for the results of other invocations.
- Parameters:
caller_invocation_id (InvocationId | None) – The ID of the waiting invocation.
result_invocation_ids (list[InvocationId]) – The IDs of the invocations being waited on.
- get_blocking_invocations(max_num_invocation_ids: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs that are blocking others but are not blocked themselves.
- Parameters:
max_num_invocation_ids (int) – The maximum number of blocking invocation IDs to retrieve.
- Returns:
An iterator over unblocked, blocking invocation IDs.
- Return type:
Iterator[InvocationId]
Note
The order of the returned invocation IDs is **oldest first**.
- set_invocation_status(invocation_id: pynenc.identifiers.invocation_id.InvocationId, status: pynenc.invocation.status.InvocationStatus, runner_ctx: pynenc.runner.runner_context.RunnerContext) None[source]¶
Sets the status of a specific invocation.
- Parameters:
invocation_id (InvocationId) – The ID of the invocation to update.
status (InvocationStatus) – The new status to set for the invocation.
- Raises:
InvocationStatusTransitionError – If transition is not allowed
InvocationStatusOwnershipError – If ownership rules are violated
InvocationStatusRaceConditionError – If a race condition is detected during status update
- set_invocation_result(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, result: Any, runner_ctx: pynenc.runner.runner_context.RunnerContext) None[source]¶
Sets the result for a completed invocation.
- Parameters:
invocation (DistributedInvocation) – The invocation that has completed.
result (Any) – The result of the invocation’s execution.
- set_invocation_exception(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, exception: Exception, runner_ctx: pynenc.runner.runner_context.RunnerContext) None[source]¶
Sets an exception for an invocation that finished with an error.
- Parameters:
invocation (DistributedInvocation) – The invocation that encountered an exception.
exception (Exception) – The exception that occurred during the invocation’s execution.
- set_invocation_retry(invocation_id: pynenc.identifiers.invocation_id.InvocationId, exception: Exception, runner_ctx: pynenc.runner.runner_context.RunnerContext) None[source]¶
Sets an invocation for retry in case of a retriable exception.
- Parameters:
invocation_id (InvocationId) – The ID of the invocation to be retried.
exception (Exception) – The exception that triggered the retry.
- is_candidate_to_run_by_concurrency_control(invocation: DistributedInvocation[Params, Result]) bool[source]¶
Checks if an invocation can be candidate to run based on the task’s concurrency control configuration.
- Parameters:
invocation (DistributedInvocation) – The invocation to check for authorization.
- Returns:
True if the invocation is authorized to be a running candidate, False otherwise.
- is_authorize_to_run_by_concurrency_control(invocation: DistributedInvocation[Params, Result]) bool[source]¶
Checks if an invocation can be candidate to run based on the task’s concurrency control configuration.
- Parameters:
invocation (DistributedInvocation) – The invocation to check for authorization.
- Returns:
True if the invocation is authorized to be a running candidate, False otherwise.
- _is_authorize_by_concurrency_control(invocation: DistributedInvocation[Params, Result], statuses: list[pynenc.invocation.status.InvocationStatus]) bool[source]¶
Checks if an invocation is authorized to run based on the task's concurrency control configuration. ```{important} The authorization is determined by the task's running_concurrency setting: - If ConcurrencyControlType.DISABLED, the invocation is always authorized to run. - If ConcurrencyControlType.TASK, it checks if there are any other running invocations of the same task. If there are, the invocation is not authorized to run. - If ConcurrencyControlType.ARGUMENTS, it checks for any running invocation of the same task with the same arguments. If there are, the invocation is not authorized to run. - If ConcurrencyControlType.KEYS, it checks for any running invocation with the same (key) arguments. If any are found, the invocation is not authorized to run. ``` ```{note} The function call.serialized_args_for_concurrency_control is used to determine the arguments that are relevant for checking existing running invocations based on the task's running_concurrency option. ``` :param DistributedInvocation invocation: The invocation to check for authorization. :param list[InvocationStatus] statuses: The statuses to check for existing invocations.- Returns:
True if the invocation is authorized, False otherwise.
- get_blocking_invocations_to_run(max_num_invocations: int, blocking_invocation_ids: set[pynenc.identifiers.invocation_id.InvocationId], runner_ctx: pynenc.runner.runner_context.RunnerContext) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs that are blocking others but are not themselves blocked, up to a maximum number.
- Parameters:
- Returns:
An iterator over the blocking invocation IDs.
- Return type:
Iterator[“InvocationId”]
- get_additional_invocations_to_run(missing_invocations: int, blocking_invocation_ids: set[pynenc.identifiers.invocation_id.InvocationId], invocations_to_reroute: set[pynenc.identifiers.invocation_id.InvocationId], runner_ctx: pynenc.runner.runner_context.RunnerContext) collections.abc.Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves additional invocations to run, considering those not blocked or already identified as blocking.
- Parameters:
- Returns:
An iterator over the additional invocations to run.
- Return type:
Iterator[“DistributedInvocation”]
- reroute_invocations(invocations_to_reroute: set[pynenc.identifiers.invocation_id.InvocationId], runner_ctx: pynenc.runner.runner_context.RunnerContext) None[source]¶
Reroutes the specified invocations , typically when they are not authorized to run.
- Parameters:
invocations_to_reroute (set[“InvocationId”]) – The invocations to be rerouted.
- get_invocations_to_run(max_num_invocations: int, runner_ctx: pynenc.runner.runner_context.RunnerContext) collections.abc.Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves a set of invocations to run, considering blocking and concurrency control.
- Parameters:
max_num_invocations (int) – The maximum number of invocations to retrieve.
- Returns:
An iterator over invocations that are ready to run.
- Return type:
Iterator[“DistributedInvocation”]
- register_new_invocations(invocations: list[DistributedInvocation[Params, Result]]) None[source]¶
Registers a new invocation in the state backend and the orchestrator.
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation to register.
runner_id (str | None) – The owner ID for ownership validation
- _route_new_call_invocation(call: Call[Params, Result], runner_id: str | None = None) DistributedInvocation[Params, Result][source]¶
Routes a new call invocation within the distributed task system.
This method creates and routes a new
DistributedInvocationfor the provided call. It is primarily used when the task does not have single invocation options set.- Parameters:
call (Call[Params, Result]) – The task call to be routed.
- Returns:
The newly created
DistributedInvocationfor the call.- Return type:
DistributedInvocation[Params, Result]
- route_call(call: pynenc.call.Call) pynenc.invocation.dist_invocation.DistributedInvocation[source]¶
Routes a task call in the distributed task system, considering single invocation options.
This method handles the routing of a task call.
Important
Note the different behavior depending on the task's registration_concurrency option. A task is Registered when it is created but not yet Running, preventing over-queueing: - If ConcurrencyControlType.DISABLED, It always creates a new invocation. - If ConcurrencyControlType.TASK, It checks for any existing invocation of the same task regardless the arguments. If any is found, it reuses it, otherwise it creates a new invocation. - If ConcurrencyControlType.ARGUMENTS, It checks for any existing invocation with the same arguments. If any is found, it reuses it, otherwise it creates a new invocation. - If ConcurrencyControlType.KEYS, It checks for any existing invocation with the same key arguments. If any is found and the non-key arguments are the same, it always reuses it, IF the non-key arguments are differents and on_diff_non_key_args_raise is set to True, it raises an error, otherwise it reuses it with the new non-key arguments. If no invocation is found, it creates a new invocation.Note
The function call.serialized_args_for_concurrency_control is used to get the arguments that are used to check for existing invocations based on the task's registration_concurrency option.
- Parameters:
call (Call) – The task call to be routed.
- Returns:
A
DistributedInvocationobject, which could be either a new or reused invocation.- Return type:
- Raises:
InvocationConcurrencyWithDifferentArgumentsError – If an invocation with different arguments exists and the task’s configuration specifies to raise an error in such cases.
- route_calls(calls: list[PreSerializedCall[Params, Result]]) list[DistributedInvocation[Params, Result]][source]¶
Routes multiple calls at once for improved performance.
This method is specifically for batch processing tasks with disabled concurrency control.
- Parameters:
calls (list[PreSerializedCall[Params, Result]]) – The calls to be routed.
- Returns:
The list of routed invocations.
- Return type:
list[DistributedInvocation[Params, Result]]
- Raises:
TaskParallelProcessingError – If concurrency control is enabled for any of the calls.
- abstractmethod register_runner_heartbeats(runner_ids: list[str], can_run_atomic_service: bool = False) None[source]¶
Register or update heartbeat timestamps for one or more runners.
This unified method handles both:
Parent/atomic service runners registering themselves (can_run_atomic_service=True)
Worker runners registering themselves (can_run_atomic_service=False, default)
Parent runners reporting their children’s heartbeats (can_run_atomic_service=False)
For runners that already exist, only updates the heartbeat timestamp. For new runners, creates a new record with the given atomic service eligibility.
- abstractmethod _get_active_runners(timeout_seconds: float, can_run_atomic_service: bool | None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]¶
Retrieve runners that are considered active based on heartbeat activity.
A runner is considered “active” if it has sent a heartbeat within the timeout period. Implementations must return runners ordered by
creation_timeascending, thenrunner_idascending. Atomic-service slots are calculated from that exact order, so every runner must observe the same order for the same backend snapshot. Runners registered after a snapshot are naturally considered only in later queries, and later creation times place them after existing runners.- Parameters:
- Returns:
List of active runners ordered by creation time, then runner ID
- Return type:
list[“ActiveRunnerInfo”]
- get_active_runners(can_run_atomic_service: bool | None = None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]¶
Retrieve runners that are considered active based on heartbeat activity.
A runner is considered “active” if it has sent a heartbeat within the timeout period. Runners are returned in the deterministic order required by
_get_active_runners: creation time ascending, then runner ID ascending.
- abstractmethod get_pending_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs stuck in PENDING status beyond the allowed time.
- Returns:
Iterator of invocation IDs that need recovery.
- Return type:
Iterator[“InvocationId”]
- abstractmethod _get_running_invocations_for_recovery(timeout_seconds: float) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs in RUNNING status owned by inactive runners.
An inactive runner is one that hasn’t sent a heartbeat within the configured timeout period. Invocations owned by such runners are considered stuck and need recovery.
- Parameters:
timeout_seconds (float) – Heartbeat timeout in seconds
- Returns:
Iterator of invocation IDs that need recovery.
- Return type:
Iterator[“InvocationId”]
- get_running_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs in RUNNING status owned by inactive runners.
An inactive runner is one that hasn’t sent a heartbeat within the configured timeout period. Invocations owned by such runners are considered stuck and need recovery.
- Parameters:
timeout_seconds (float) – Heartbeat timeout in seconds
- Returns:
Iterator of invocation IDs that need recovery.
- Return type:
Iterator[“InvocationId”]
- try_claim_atomic_service_run(runner_ctx: pynenc.runner.runner_context.RunnerContext) pynenc.orchestrator.atomic_service.AtomicServiceRun | None[source]¶
Return a claimed run, or
Noneif this runner does not own the slot.decide_atomic_service_claimselects the runner that owns the current slot. The backend claim write is still responsible for cluster-wide mutual exclusion because runners can observe slightly different membership snapshots while workers are starting/stopping.
- abstractmethod record_atomic_service_execution_start(atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun, started_at: datetime.datetime | None, status: pynenc.orchestrator.atomic_service.AtomicServiceExecutionStatus = AtomicServiceExecutionStatus.RUNNING, reason: str = '') bool[source]¶
Insert a new atomic-service execution record.
Implementations MUST store
status(defaulting toRUNNING) and the free-formreasonso that subsequent backend queries can decide whether the slot is still being held.For the default
RUNNINGstatus this method is the storage-level atomic gate: at most one active execution may be recorded. If another execution is already active, implementations MUST NOT leave this run inRUNNINGand SHOULD record a zero-widthBLOCKEDdiagnostic row. The return value tells callers whether the run may execute work.- Parameters:
atomic_service_run (AtomicServiceRun) – The claimed run identity.
started_at (datetime | None) – When the runner began its claim attempt (UTC timezone-aware). Runtime callers may pass
Noneso the backend stamps the row at the storage admission point. ForBLOCKEDrecords this is also theend_timebecause no work was performed.status (AtomicServiceExecutionStatus) – Initial status of the record.
RUNNINGfor successful claims;BLOCKEDfor recorded skip attempts.reason (str) – Free-form diagnostic detail, surfaced in Pynmon.
- Returns:
Truewhen the execution is allowed to run work.
- abstractmethod finalize_atomic_service_execution(atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun, end_time: datetime.datetime, status: pynenc.orchestrator.atomic_service.AtomicServiceExecutionStatus, reason: str = '') None[source]¶
Transition a previously-inserted record to a terminal status.
Idempotent: backends may be called more than once for the same
atomic_service_run(for example, the runner’s normalCOMPLETEDfinalisation racing the reaper’sABANDONEDcleanup). The first terminal finalisation wins.If no prior
RUNNINGrecord exists for this run (the start write failed), implementations MUST insert one withstart_time == end_timeso the row is still visible.- Parameters:
atomic_service_run (AtomicServiceRun) – The claimed run identity.
end_time (datetime) – When execution ended (UTC).
status (AtomicServiceExecutionStatus) – Terminal status (
COMPLETED/ABANDONED/BLOCKED).reason (str) – Diagnostic detail; empty for clean completion.
- abstractmethod get_active_atomic_service_executions() list[pynenc.orchestrator.atomic_service.AtomicServiceExecution][source]¶
Return every execution currently in
RUNNINGstatus.Used by the claim flow to enforce mutual exclusion and by Pynmon to surface in-flight runs. Ordered most-recently-started first.
- abstractmethod get_atomic_service_executions_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, limit: int = 1000, *, runner_id: str | None = None, min_duration_seconds: float = 0.0) list[pynenc.orchestrator.atomic_service.AtomicServiceExecution][source]¶
Retrieve atomic-service execution windows that overlap a time range.