from abc import ABC, abstractmethod
from collections.abc import Iterator
from datetime import datetime
from functools import cached_property
from time import time
from typing import TYPE_CHECKING, Any
from pynenc import context
from pynenc.conf.config_orchestrator import ConfigOrchestrator
from pynenc.conf.config_task import ConcurrencyControlType
from pynenc.exceptions import (
InvocationConcurrencyWithDifferentArgumentsError,
TaskParallelProcessingError,
InvocationStatusError,
)
from pynenc.invocation.dist_invocation import DistributedInvocation, ReusedInvocation
from pynenc.invocation.status import InvocationStatus, InvocationStatusRecord
from pynenc.orchestrator.atomic_service import (
AtomicServiceExecutionStatus,
AtomicServiceRun,
decide_atomic_service_claim,
)
if TYPE_CHECKING:
from pynenc.app import Pynenc
from pynenc.call import Call, PreSerializedCall, CallId
from pynenc.identifiers.invocation_id import InvocationId
from pynenc.orchestrator.atomic_service import (
ActiveRunnerInfo,
AtomicServiceExecution,
)
from pynenc.runner.runner_context import RunnerContext
from pynenc.task import Task, TaskId
from pynenc.types import Params, Result
[docs]
class BaseBlockingControl(ABC):
"""
A component of the orchestrator to implement blocking control functionalities.
This abstract base class defines the interface for managing blocking behavior in distributed task executions.
"""
[docs]
@abstractmethod
def release_waiters(self, waited_invocation_id: "InvocationId") -> None:
"""
Releases any invocations that are waiting on the specified invocation.
:param InvocationId waited_invocation_id: The ID of the invocation that has finished and can release its waiters.
"""
[docs]
@abstractmethod
def waiting_for_results(
self,
caller_invocation_id: "InvocationId",
result_invocation_ids: list["InvocationId"],
) -> None:
"""
Notifies the system that an invocation is waiting for the results of other invocations.
:param InvocationId caller_invocation_id: The ID of the invocation that is waiting.
:param list[InvocationId] result_invocation_ids: The IDs of the invocations being waited on.
"""
[docs]
@abstractmethod
def get_blocking_invocations(
self, max_num_invocations: int
) -> Iterator["InvocationId"]:
"""
Retrieves invocation IDs that are blocking others but are not blocked themselves.
:param int max_num_invocations: The maximum number of blocking invocation IDs to retrieve.
:return: An iterator over unblocked, blocking invocation IDs, **ordered by age (oldest first)**.
:rtype: Iterator[InvocationId]
"""
[docs]
class BaseOrchestrator(ABC):
"""
Abstract base class defining the orchestrator's interface in a distributed task system.
The orchestrator is responsible for managing task invocations, including tracking their status,
handling retries, and implementing blocking control.
.. note:: **Coupling with StateBackend**
The orchestrator relies on ``app.state_backend`` for persisting invocation data,
results, exceptions, and history. Methods such as ``set_invocation_status``,
``set_invocation_result``, and ``route_call`` delegate storage to the state backend.
This coupling is intentional: the orchestrator owns lifecycle transitions while the
state backend owns persistence. Future refactors may introduce an explicit interface
to formalize this contract.
:param Pynenc app: The Pynenc application instance.
"""
def __init__(self, app: "Pynenc") -> None:
self.app = app
@cached_property
def conf(self) -> ConfigOrchestrator:
return ConfigOrchestrator(
config_values=self.app.config_values,
config_filepath=self.app.config_filepath,
)
[docs]
@abstractmethod
def _register_new_invocations(
self,
invocations: list["DistributedInvocation[Params, Result]"],
runner_id: str | None = None,
) -> InvocationStatusRecord:
"""
Register new invocations with status Register if they don't exist yet.
:param list[DistributedInvocation[Params, Result]] invocations: The invocations to be registered.
:param str | None runner_id: The owner ID for ownership validation
:return: The status record of the registered invocation.
"""
[docs]
@abstractmethod
def get_existing_invocations(
self,
task: "Task[Params, Result]",
key_serialized_arguments: dict[str, str] | None = None,
statuses: "list[InvocationStatus] | None" = None,
) -> Iterator["InvocationId"]:
"""
Retrieves existing invocation IDs based on task, arguments, and status.
:param Task[Params, Result] task: The task for which to retrieve invocations.
:param dict[str, str] | None key_serialized_arguments: Serialized arguments to filter invocations.
:param list[InvocationStatus] | None statuses: The statuses to filter invocations.
:return: An iterator over the matching invocation IDs.
:rtype: Iterator[InvocationId]
"""
[docs]
@abstractmethod
def get_task_invocation_ids(self, task_id: "TaskId") -> Iterator["InvocationId"]:
"""
Retrieves all invocation IDs associated with a specific task ID.
:param TaskId task_id: The task ID to filter invocations.
:return: List of invocation IDs for the specified task.
"""
[docs]
@abstractmethod
def get_invocation_ids_paginated(
self,
task_id: "TaskId | None" = None,
statuses: "list[InvocationStatus] | None" = None,
limit: int = 100,
offset: int = 0,
) -> list["InvocationId"]:
"""
Retrieves invocation IDs with pagination support.
This method provides efficient pagination for large datasets by using
LIMIT/OFFSET semantics. Results are ordered by registration time (newest first).
:param TaskId | None task_id: Optional task ID to filter by.
:param list[InvocationStatus] | None statuses: Optional statuses to filter by.
:param int limit: Maximum number of results to return.
:param int offset: Number of results to skip.
:return: List of matching invocation IDs.
"""
[docs]
@abstractmethod
def count_invocations(
self,
task_id: "TaskId | None" = None,
statuses: "list[InvocationStatus] | None" = None,
) -> int:
"""
Counts invocations matching the given filters.
:param str | None task_id: Optional task ID to filter by.
:param list[InvocationStatus] | None statuses: Optional statuses to filter by.
:return: The total count of matching invocations.
"""
[docs]
@abstractmethod
def get_call_invocation_ids(self, call_id: "CallId") -> Iterator["InvocationId"]:
"""
Retrieves all invocation IDs associated with a specific call ID.
:param CallId call_id: The call ID to filter invocations.
:return: List of invocation IDs for the specified call.
"""
[docs]
@abstractmethod
def _atomic_status_transition(
self,
invocation_id: "InvocationId",
status: InvocationStatus,
runner_id: str | None = None,
) -> InvocationStatusRecord:
"""
Atomically validates and transitions invocation status.
Backend implementations must:
1. Read current status record
2. Validate transition using status_record_transition()
3. Atomically update only if validation passes
4. Return the new status record
All validation and state changes happen within a single atomic operation.
:param InvocationId invocation_id: The ID of the invocation to update
:param InvocationStatus status: The target status
:param str | None runner_id: The owner ID for ownership validation
:return: The new status record after successful transition
:rtype: InvocationStatusRecord
:raises InvocationStatusTransitionError: If transition is not allowed
:raises InvocationStatusOwnershipError: If ownership rules are violated
:raises KeyError: If invocation does not exist
"""
# Example implementation pattern (varies by backend):
#
# def _atomic_status_transition(self, invocation_id, status, runner_id):
# # PostgreSQL with transaction:
# with transaction:
# current = get_invocation_status_record(invocation_id)
# new_record = status_record_transition(current, status, runner_id)
# UPDATE ... WHERE invocation_id = ? AND status = current.status
# if not updated: raise race condition
# return new_record
#
# # Redis with Lua script:
# lua_script = """
# local current = get_status(invocation_id)
# -- validate in Lua or return data for Python validation
# if valid then set_status(new_status) end
# """
# return execute_lua(lua_script)
#
# # MongoDB findAndModify:
# current = find_one(invocation_id)
# new_record = status_record_transition(current, status, runner_id)
# result = find_and_modify(
# query={id: invocation_id, status: current.status},
# update={status: new_record}
# )
# if not result: raise race condition
# return new_record
[docs]
@abstractmethod
def index_arguments_for_concurrency_control(
self,
invocation: "DistributedInvocation[Params, Result]",
) -> None:
"""
Caches the required data to implement concurrency control.
:param DistributedInvocation[Params, Result] invocation: The invocation to be cached.
"""
[docs]
@abstractmethod
def set_up_invocation_auto_purge(self, invocation_id: "InvocationId") -> None:
"""
Sets up automatic purging for an invocation after a defined period.
```{note}
Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours`
```
:param InvocationId invocation_id: The invocation to set up for auto purge.
"""
[docs]
@abstractmethod
def auto_purge(self) -> None:
"""
Automatically purges all invocations in a final state that are older than a defined time period.
```{note}
Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours`
```
"""
[docs]
def get_invocation_status(
self, invocation_id: "InvocationId"
) -> "InvocationStatus":
"""
Retrieves the status of a specific invocation id.
:param InvocationId invocation_id: The id of the invocation whose status is to be retrieved.
:return: The current status of the invocation.
:rtype: InvocationStatus
:raises KeyError: If the invocation ID does not exist.
"""
status_record = self.get_invocation_status_record(invocation_id)
return status_record.status
[docs]
@abstractmethod
def get_invocation_status_record(
self, invocation_id: "InvocationId"
) -> "InvocationStatusRecord":
"""
Retrieves the status record of a specific invocation id.
:param InvocationId invocation_id: The id of the invocation whose status is to be retrieved.
:return: The current status record of the invocation.
:rtype: InvocationStatusRecord
:raises KeyError: If the invocation ID does not exist.
"""
[docs]
@abstractmethod
def increment_invocation_retries(self, invocation_id: "InvocationId") -> None:
"""
Increments the retry count of a specific invocation.
:param InvocationId invocation_id: The id of the invocation for which to increment retries.
"""
[docs]
@abstractmethod
def get_invocation_retries(self, invocation_id: "InvocationId") -> int:
"""
Retrieves the number of retries for a specific invocation.
:param InvocationId invocation_id: The id of the invocation whose retry count is to be retrieved.
:return: The number of retries for the invocation.
:rtype: int
"""
[docs]
@abstractmethod
def filter_by_status(
self,
invocation_ids: list["InvocationId"],
status_filter: frozenset["InvocationStatus"],
) -> list["InvocationId"]:
"""
Filters a list of invocation ids by their status in an optimized way.
This method allows efficient batch filtering of invocations by status,
reducing the number of individual status checks needed.
:param list[InvocationId] invocations: The invocation ids to filter
:param list[InvocationStatus] | None status_filter: The statuses to filter by.
If None, defaults to final statuses.
:return: List of invocation ids matching the status filter
:rtype: list[InvocationId]
"""
pass
[docs]
def filter_final(
self, invocation_ids: list["InvocationId"]
) -> list["InvocationId"]:
"""
Returns invocation ids that have reached a final status.
:param list[InvocationId] invocations: The invocation ids to check
:return: List of invocation ids that have reached a final status
:rtype: list[InvocationId]
"""
return self.filter_by_status(
invocation_ids, InvocationStatus.get_final_statuses()
)
[docs]
@abstractmethod
def purge(self) -> None:
"""
Purges all the orchestrator data for the current self.app.app_id.
```{important}
This should only be used for testing purposes.
```
"""
#############################################
# blocking sub functionalities
@property
@abstractmethod
def blocking_control(self) -> BaseBlockingControl:
"""
Property to access the blocking control component of the orchestrator.
:return: The blocking control component.
:rtype: BaseBlockingControl
"""
[docs]
def release_waiters(self, waited_id: "InvocationId") -> None:
"""
Releases other invocations that are waiting on the completion of the specified invocation.
:param InvocationId waited_id: The ID of the invocation that has completed.
"""
if self.app.orchestrator.conf.blocking_control:
self.blocking_control.release_waiters(waited_id)
[docs]
def waiting_for_results(
self,
caller_invocation_id: "InvocationId | None",
result_invocation_ids: list["InvocationId"],
) -> None:
"""
Notifies the system that an invocation is waiting for the results of other invocations.
:param InvocationId | None caller_invocation_id: The ID of the waiting invocation.
:param list[InvocationId] result_invocation_ids: The IDs of the invocations being waited on.
"""
if not result_invocation_ids:
self.app.logger.warning(
f"Unnecessary call to waiting_for_results, invocation:{caller_invocation_id} is not waiting for any results"
)
return
if self.app.orchestrator.conf.blocking_control and caller_invocation_id:
self.blocking_control.waiting_for_results(
caller_invocation_id, result_invocation_ids
)
[docs]
def get_blocking_invocations(
self, max_num_invocation_ids: int
) -> Iterator["InvocationId"]:
"""
Retrieves invocation IDs that are blocking others but are not blocked themselves.
:param int max_num_invocation_ids: The maximum number of blocking invocation IDs to retrieve.
:return: An iterator over unblocked, blocking invocation IDs.
:rtype: Iterator[InvocationId]
```{note}
The order of the returned invocation IDs is **oldest first**.
```
"""
if self.app.orchestrator.conf.blocking_control:
yield from self.blocking_control.get_blocking_invocations(
max_num_invocation_ids
)
#############################################
[docs]
def set_invocation_status(
self,
invocation_id: "InvocationId",
status: "InvocationStatus",
runner_ctx: "RunnerContext",
) -> None:
"""
Sets the status of a specific invocation.
:param InvocationId invocation_id: The ID of the invocation to update.
:param InvocationStatus status: The new status to set for the invocation.
:raises InvocationStatusTransitionError: If transition is not allowed
:raises InvocationStatusOwnershipError: If ownership rules are violated
:raises InvocationStatusRaceConditionError: If a race condition is detected during status update
"""
new_status_record = self._atomic_status_transition(
invocation_id, status, runner_ctx.runner_id
)
if status.is_final():
self.release_waiters(invocation_id)
self.set_up_invocation_auto_purge(invocation_id)
self.app.state_backend.add_history(invocation_id, new_status_record, runner_ctx)
self.app.trigger.report_tasks_status([invocation_id], status)
self.app.logger.info(f"invocation:{invocation_id} status:{status.value}")
[docs]
def set_invocation_result(
self,
invocation: "DistributedInvocation",
result: Any,
runner_ctx: "RunnerContext",
) -> None:
"""
Sets the result for a completed invocation.
:param DistributedInvocation invocation: The invocation that has completed.
:param Any result: The result of the invocation's execution.
"""
self.app.state_backend.set_result(invocation.invocation_id, result)
self.app.orchestrator.set_invocation_status(
invocation.invocation_id, InvocationStatus.SUCCESS, runner_ctx
)
self.app.trigger.report_invocation_result(invocation, result)
[docs]
def set_invocation_exception(
self,
invocation: "DistributedInvocation",
exception: Exception,
runner_ctx: "RunnerContext",
) -> None:
"""
Sets an exception for an invocation that finished with an error.
:param DistributedInvocation invocation: The invocation that encountered an exception.
:param Exception exception: The exception that occurred during the invocation's execution.
"""
self.app.state_backend.set_exception(invocation.invocation_id, exception)
# TODO! on previous fail, this should still change status
# eg. on case of interruption from a kubernetes pod (SIGTERM, SIGKILL)
# it should try to finish all the calls in this function
self.app.orchestrator.set_invocation_status(
invocation.invocation_id, InvocationStatus.FAILED, runner_ctx
)
self.app.trigger.report_invocation_failure(invocation, exception)
[docs]
def set_invocation_retry(
self,
invocation_id: "InvocationId",
exception: Exception,
runner_ctx: "RunnerContext",
) -> None:
"""
Sets an invocation for retry in case of a retriable exception.
:param InvocationId invocation_id: The ID of the invocation to be retried.
:param Exception exception: The exception that triggered the retry.
"""
# TODO! on previous fail, this should still change status
# eg. on case of interruption from a kubernetes pod (SIGTERM, SIGKILL)
# it should try to finish all the calls in this function
# TODO store Retry exception on Retry status
self.app.orchestrator.set_invocation_status(
invocation_id, InvocationStatus.RETRY, runner_ctx
)
self.app.orchestrator.increment_invocation_retries(invocation_id)
self.app.broker.route_invocation(invocation_id)
[docs]
def is_candidate_to_run_by_concurrency_control(
self, invocation: "DistributedInvocation[Params, Result]"
) -> bool:
"""
Checks if an invocation can be candidate to run based on the task's concurrency control configuration.
:param DistributedInvocation invocation: The invocation to check for authorization.
:return: True if the invocation is authorized to be a running candidate, False otherwise.
"""
return self._is_authorize_by_concurrency_control(
invocation, [InvocationStatus.PENDING, InvocationStatus.RUNNING]
)
[docs]
def is_authorize_to_run_by_concurrency_control(
self, invocation: "DistributedInvocation[Params, Result]"
) -> bool:
"""
Checks if an invocation can be candidate to run based on the task's concurrency control configuration.
:param DistributedInvocation invocation: The invocation to check for authorization.
:return: True if the invocation is authorized to be a running candidate, False otherwise.
"""
return self._is_authorize_by_concurrency_control(
invocation, [InvocationStatus.RUNNING]
)
[docs]
def _is_authorize_by_concurrency_control(
self,
invocation: "DistributedInvocation[Params, Result]",
statuses: list["InvocationStatus"],
) -> bool:
"""
Checks if an invocation is authorized to run based on the task's concurrency control configuration.
```{important}
The authorization is determined by the task's running_concurrency setting:
- If ConcurrencyControlType.DISABLED, the invocation is always authorized to run.
- If ConcurrencyControlType.TASK, it checks if there are any other running invocations of the same task.
If there are, the invocation is not authorized to run.
- If ConcurrencyControlType.ARGUMENTS, it checks for any running invocation of the same task with the same arguments.
If there are, the invocation is not authorized to run.
- If ConcurrencyControlType.KEYS, it checks for any running invocation with the same (key) arguments.
If any are found, the invocation is not authorized to run.
```
```{note}
The function call.serialized_args_for_concurrency_control is used to determine the arguments
that are relevant for checking existing running invocations based on the task's running_concurrency option.
```
:param DistributedInvocation invocation: The invocation to check for authorization.
:param list[InvocationStatus] statuses: The statuses to check for existing invocations.
:return: True if the invocation is authorized, False otherwise.
"""
if invocation.task.conf.running_concurrency == ConcurrencyControlType.DISABLED:
return True
running_invocation = next(
self.get_existing_invocations(
task=invocation.call.task,
key_serialized_arguments=invocation.call.serialized_args_for_concurrency_control(
invocation.task.conf.running_concurrency
),
statuses=statuses,
),
None,
)
if not running_invocation:
return True
invocation.task.logger.info(
f"invocation:{invocation.invocation_id} deferred by concurrency_control: "
f"invocation:{running_invocation} already in status:{statuses} "
f"(concurrency_control={invocation.task.conf.running_concurrency.value})"
)
return False
[docs]
def get_blocking_invocations_to_run(
self,
max_num_invocations: int,
blocking_invocation_ids: set["InvocationId"],
runner_ctx: "RunnerContext",
) -> Iterator["InvocationId"]:
"""
Retrieves invocation IDs that are blocking others but are not themselves blocked, up to a maximum number.
:param int max_num_invocations: The maximum number of blocking invocations to retrieve.
:param set["InvocationId"] blocking_invocation_ids: A set of invocation IDs that are already identified as blocking.
:return: An iterator over the blocking invocation IDs.
:rtype: Iterator["InvocationId"]
"""
for blocking_invocation_id in self.get_blocking_invocations(
max_num_invocations
):
current_status = self.get_invocation_status(blocking_invocation_id)
# Skip silently if the invocation is already past the point where PENDING makes sense
# (e.g. already RUNNING, SUCCESS, FAILED) — another worker already claimed it
if not current_status.can_transition_to(InvocationStatus.PENDING):
continue
blocking_invocation = self.app.state_backend.get_invocation(
blocking_invocation_id
)
if not self.is_candidate_to_run_by_concurrency_control(blocking_invocation):
continue
try:
self.set_invocation_status(
blocking_invocation_id, InvocationStatus.PENDING, runner_ctx
)
blocking_invocation_ids.add(blocking_invocation_id)
yield blocking_invocation_id
except InvocationStatusError as ex:
# Race condition: another worker claimed it between our status check and transition
self.app.logger.debug(
f"Could not set blocking invocation:{blocking_invocation_id} to status:pending: {ex}"
)
continue
[docs]
def get_additional_invocations_to_run(
self,
missing_invocations: int,
blocking_invocation_ids: set["InvocationId"],
invocations_to_reroute: set["InvocationId"],
runner_ctx: "RunnerContext",
) -> Iterator["DistributedInvocation"]:
"""
Retrieves additional invocations to run, considering those not blocked or already identified as blocking.
:param int missing_invocations: The number of additional invocations needed.
:param set["InvocationId"] blocking_invocation_ids: IDs of invocations already identified as blocking.
:param set["InvocationId"] invocations_to_reroute: A set to collect invocations that need rerouting.
:return: An iterator over the additional invocations to run.
:rtype: Iterator["DistributedInvocation"]
"""
while missing_invocations > 0:
if invocation_id := self.app.broker.retrieve_invocation():
if invocation_id not in blocking_invocation_ids:
invocation_status = self.get_invocation_status(invocation_id)
if invocation_status.is_available_for_run():
invocation = self.app.state_backend.get_invocation(
invocation_id
)
if not self.is_candidate_to_run_by_concurrency_control(
invocation
):
if invocation.task.conf.reroute_on_concurrency_control:
self.set_invocation_status(
invocation_id,
InvocationStatus.CONCURRENCY_CONTROLLED,
runner_ctx,
)
invocations_to_reroute.add(invocation_id)
else:
self.set_invocation_status(
invocation_id,
InvocationStatus.CONCURRENCY_CONTROLLED_FINAL,
runner_ctx,
)
continue
try:
self.set_invocation_status(
invocation_id,
InvocationStatus.PENDING,
runner_ctx,
)
missing_invocations -= 1
yield invocation
except InvocationStatusError as ex:
self.app.logger.warning(
f"Could not set invocation:{invocation_id} to status:pending: {ex}"
)
continue
else:
break
[docs]
def reroute_invocations(
self,
invocations_to_reroute: set["InvocationId"],
runner_ctx: "RunnerContext",
) -> None:
"""
Reroutes the specified invocations , typically when they are not authorized to run.
:param set["InvocationId"] invocations_to_reroute: The invocations to be rerouted.
"""
for invocation_id in invocations_to_reroute:
self.set_invocation_status(
invocation_id, InvocationStatus.REROUTED, runner_ctx
)
self.app.broker.route_invocation(invocation_id)
[docs]
def get_invocations_to_run(
self, max_num_invocations: int, runner_ctx: "RunnerContext"
) -> Iterator["DistributedInvocation"]:
"""
Retrieves a set of invocations to run, considering blocking and concurrency control.
:param int max_num_invocations: The maximum number of invocations to retrieve.
:return: An iterator over invocations that are ready to run.
:rtype: Iterator["DistributedInvocation"]
"""
blocking_invocation_ids: set[InvocationId] = set()
# Get blocking invocations as IDs but still need to yield actual invocations
for blocking_invocation_id in self.get_blocking_invocations_to_run(
max_num_invocations, blocking_invocation_ids, runner_ctx
):
if invocation := self.app.state_backend.get_invocation(
blocking_invocation_id
):
yield invocation
invocations_to_reroute: set[InvocationId] = set()
missing_invocations = max_num_invocations - len(blocking_invocation_ids)
yield from self.get_additional_invocations_to_run(
missing_invocations,
blocking_invocation_ids,
invocations_to_reroute,
runner_ctx,
)
self.reroute_invocations(invocations_to_reroute, runner_ctx)
[docs]
def register_new_invocations(
self, invocations: list["DistributedInvocation[Params, Result]"]
) -> None:
"""
Registers a new invocation in the state backend and the orchestrator.
:param DistributedInvocation[Params, Result] invocation: The invocation to register.
:param str | None runner_id: The owner ID for ownership validation
"""
runner_ctx = context.get_or_create_runner_context(self.app.app_id)
runner_id = runner_ctx.runner_id
self.app.state_backend.upsert_invocations(invocations)
# This should add the status registered to the backend
status_record = self._register_new_invocations(invocations, runner_id)
inv_ids = [invocation.invocation_id for invocation in invocations]
self.app.state_backend.add_histories(invocations, status_record, runner_ctx)
self.app.trigger.report_tasks_status(inv_ids, status_record.status)
self.app.broker.route_invocations(inv_ids)
for invocation in invocations:
task_key = invocation.call.task.task_id.key
inv_id = invocation.invocation_id
self.app.logger.info(
f"NEW task:{task_key} invocation:{inv_id} REGISTERED and ROUTED"
)
[docs]
def _route_new_call_invocation(
self, call: "Call[Params, Result]", runner_id: str | None = None
) -> "DistributedInvocation[Params, Result]":
"""
Routes a new call invocation within the distributed task system.
This method creates and routes a new `DistributedInvocation` for the provided call.
It is primarily used when the task does not have single invocation options set.
:param Call[Params, Result] call: The task call to be routed.
:return: The newly created `DistributedInvocation` for the call.
:rtype: DistributedInvocation[Params, Result]
"""
new_invocation = DistributedInvocation.from_parent(
call,
parent_invocation=context.get_dist_invocation_context(self.app.app_id),
parent_event_id=context.get_trigger_event_context(self.app.app_id),
)
self.register_new_invocations([new_invocation])
if (
call.task.conf.registration_concurrency != ConcurrencyControlType.DISABLED
or call.task.conf.running_concurrency != ConcurrencyControlType.DISABLED
):
self.index_arguments_for_concurrency_control(new_invocation)
self.app.logger.info(f"invocation:{new_invocation.invocation_id} ROUTED")
return new_invocation
[docs]
def route_call(self, call: "Call") -> "DistributedInvocation":
"""
Routes a task call in the distributed task system, considering single invocation options.
This method handles the routing of a task call.
```{important}
Note the different behavior depending on the task's registration_concurrency option.
A task is Registered when it is created but not yet Running, preventing over-queueing:
- If ConcurrencyControlType.DISABLED,
It always creates a new invocation.
- If ConcurrencyControlType.TASK,
It checks for any existing invocation of the same task regardless the arguments.
If any is found, it reuses it, otherwise it creates a new invocation.
- If ConcurrencyControlType.ARGUMENTS,
It checks for any existing invocation with the same arguments.
If any is found, it reuses it, otherwise it creates a new invocation.
- If ConcurrencyControlType.KEYS,
It checks for any existing invocation with the same key arguments.
If any is found and the non-key arguments are the same, it always reuses it,
IF the non-key arguments are differents and on_diff_non_key_args_raise is set to True,
it raises an error, otherwise it reuses it with the new non-key arguments.
If no invocation is found, it creates a new invocation.
```
```{note}
The function call.serialized_args_for_concurrency_control is used to get the arguments
that are used to check for existing invocations based on the task's registration_concurrency option.
```
:param Call call: The task call to be routed.
:return: A `DistributedInvocation` object, which could be either a new or reused invocation.
:rtype: DistributedInvocation
:raises InvocationConcurrencyWithDifferentArgumentsError: If an invocation with different arguments exists
and the task's configuration specifies to raise an error in such cases.
"""
if call.task.conf.registration_concurrency == ConcurrencyControlType.DISABLED:
call.task.logger.debug(f"New invocation for call {call}")
return self._route_new_call_invocation(call)
# Handleling registred task concurrency control
invocation_id = next(
self.get_existing_invocations(
task=call.task,
# we filter here by TASK, all Arguments, or defined keys based in the config
# TODO Make it explicit
key_serialized_arguments=call.serialized_args_for_concurrency_control(
call.task.conf.registration_concurrency
),
statuses=[InvocationStatus.REGISTERED],
),
None,
)
if not invocation_id:
call.task.logger.debug(
f"No matching registered invocation found for {call.task} "
f"and key args {call.serialized_args_for_concurrency_control(call.task.conf.registration_concurrency)}"
)
return self._route_new_call_invocation(call)
invocation = self.app.state_backend.get_invocation(invocation_id)
if not invocation:
call.task.logger.warning(
f"invocation:{invocation_id} not found in state backend for call {call}, routing new invocation"
)
return self._route_new_call_invocation(call)
if invocation.call.call_id == call.call_id:
call.task.logger.debug(
f"Reusing invocation:{invocation.invocation_id} for call {call}"
)
return ReusedInvocation(invocation)
if call.task.conf.on_diff_non_key_args_raise:
raise InvocationConcurrencyWithDifferentArgumentsError.from_call_mismatch(
existing_invocation=invocation, new_call=call
)
# TODO: review this code, we are reusing an invocation with different non-key arguments, should we still reuse it?
return ReusedInvocation(invocation, call.arguments)
[docs]
def route_calls(
self, calls: list["PreSerializedCall[Params, Result]"]
) -> list["DistributedInvocation[Params, Result]"]:
"""
Routes multiple calls at once for improved performance.
This method is specifically for batch processing tasks with disabled concurrency control.
:param list[PreSerializedCall[Params, Result]] calls: The calls to be routed.
:return: The list of routed invocations.
:rtype: list[DistributedInvocation[Params, Result]]
:raises TaskParallelProcessingError: If concurrency control is enabled for any of the calls.
"""
if not calls:
self.app.logger.debug("No calls to route")
return []
if (
calls[0].task.conf.registration_concurrency
!= ConcurrencyControlType.DISABLED
):
raise TaskParallelProcessingError(
calls[0].task.task_id,
"Batch processing is only supported with ConcurrencyControlType.DISABLED",
)
parent_invocation = context.get_dist_invocation_context(self.app.app_id)
invocations: list[DistributedInvocation[Params, Result]] = [
DistributedInvocation.from_parent(call, parent_invocation) for call in calls
]
self.register_new_invocations(invocations)
return invocations
[docs]
@abstractmethod
def register_runner_heartbeats(
self,
runner_ids: list[str],
can_run_atomic_service: bool = False,
) -> None:
"""
Register or update heartbeat timestamps for one or more runners.
This unified method handles both:
- Parent/atomic service runners registering themselves (can_run_atomic_service=True)
- Worker runners registering themselves (can_run_atomic_service=False, default)
- Parent runners reporting their children's heartbeats (can_run_atomic_service=False)
For runners that already exist, only updates the heartbeat timestamp.
For new runners, creates a new record with the given atomic service eligibility.
:param list[str] runner_ids: List of runner_ids to register/update heartbeats for.
:param bool can_run_atomic_service: Whether these runners are eligible to run atomic services.
"""
[docs]
@abstractmethod
def _get_active_runners(
self, timeout_seconds: float, can_run_atomic_service: bool | None
) -> list["ActiveRunnerInfo"]:
"""
Retrieve runners that are considered active based on heartbeat activity.
A runner is considered "active" if it has sent a heartbeat within the timeout period.
Implementations must return runners ordered by ``creation_time`` ascending,
then ``runner_id`` ascending. Atomic-service slots are calculated from that
exact order, so every runner must observe the same order for the same backend
snapshot. Runners registered after a snapshot are naturally considered only
in later queries, and later creation times place them after existing runners.
:param float timeout_seconds: Heartbeat timeout in seconds (typically from runner_considered_dead_after_minutes config)
:param bool | None can_run_atomic_service: If specified, filters runners based on their eligibility to run atomic services
:return: List of active runners ordered by creation time, then runner ID
:rtype: list["ActiveRunnerInfo"]
"""
[docs]
def get_active_runners(
self, can_run_atomic_service: bool | None = None
) -> list["ActiveRunnerInfo"]:
"""
Retrieve runners that are considered active based on heartbeat activity.
A runner is considered "active" if it has sent a heartbeat within the timeout period.
Runners are returned in the deterministic order required by
``_get_active_runners``: creation time ascending, then runner ID ascending.
:param bool | None can_run_atomic_service: If specified, filters runners based on their eligibility to run atomic services
:return: List of active runners ordered by creation time, then runner ID
:rtype: list["ActiveRunnerInfo"]
"""
timeout_seconds = self.app.conf.runner_considered_dead_after_minutes * 60
return self._get_active_runners(timeout_seconds, can_run_atomic_service)
[docs]
@abstractmethod
def get_pending_invocations_for_recovery(self) -> Iterator["InvocationId"]:
"""
Retrieve invocation IDs stuck in PENDING status beyond the allowed time.
:return: Iterator of invocation IDs that need recovery.
:rtype: Iterator["InvocationId"]
"""
[docs]
@abstractmethod
def _get_running_invocations_for_recovery(
self, timeout_seconds: float
) -> Iterator["InvocationId"]:
"""
Retrieve invocation IDs in RUNNING status owned by inactive runners.
An inactive runner is one that hasn't sent a heartbeat within the
configured timeout period. Invocations owned by such runners are
considered stuck and need recovery.
:param float timeout_seconds: Heartbeat timeout in seconds
:return: Iterator of invocation IDs that need recovery.
:rtype: Iterator["InvocationId"]
"""
[docs]
def get_running_invocations_for_recovery(self) -> Iterator["InvocationId"]:
"""
Retrieve invocation IDs in RUNNING status owned by inactive runners.
An inactive runner is one that hasn't sent a heartbeat within the
configured timeout period. Invocations owned by such runners are
considered stuck and need recovery.
:param float timeout_seconds: Heartbeat timeout in seconds
:return: Iterator of invocation IDs that need recovery.
:rtype: Iterator["InvocationId"]
"""
timeout_seconds = self.app.conf.runner_considered_dead_after_minutes * 60
return self._get_running_invocations_for_recovery(timeout_seconds)
[docs]
def try_claim_atomic_service_run(
self,
runner_ctx: "RunnerContext",
) -> AtomicServiceRun | None:
"""Return a claimed run, or ``None`` if this runner does not own the slot.
``decide_atomic_service_claim`` selects the runner that owns the
current slot. The backend claim write is still responsible for
cluster-wide mutual exclusion because runners can observe slightly
different membership snapshots while workers are starting/stopping.
"""
conf = self.app.conf
self.register_runner_heartbeats(
[runner_ctx.runner_id], can_run_atomic_service=True
)
now = time()
claim = decide_atomic_service_claim(
runner_id=runner_ctx.runner_id,
active_runners=self.get_active_runners(can_run_atomic_service=True),
current_time=now,
service_interval_minutes=conf.atomic_service_interval_minutes,
spread_margin_minutes=conf.atomic_service_spread_margin_minutes,
membership_stabilization_seconds=(
float(conf.atomic_service_membership_stabilization_minutes) * 60.0
),
max_start_slot_fraction=float(conf.atomic_service_max_start_slot_fraction),
)
if (run := claim.atomic_service_run) is None:
return None
if not self.record_atomic_service_execution_start(run, None):
return None
return run
[docs]
@abstractmethod
def record_atomic_service_execution_start(
self,
atomic_service_run: AtomicServiceRun,
started_at: datetime | None,
status: AtomicServiceExecutionStatus = AtomicServiceExecutionStatus.RUNNING,
reason: str = "",
) -> bool:
"""Insert a new atomic-service execution record.
Implementations MUST store ``status`` (defaulting to ``RUNNING``)
and the free-form ``reason`` so that subsequent backend queries
can decide whether the slot is still being held.
For the default ``RUNNING`` status this method is the storage-level
atomic gate: at most one active execution may be recorded. If another
execution is already active, implementations MUST NOT leave this run in
``RUNNING`` and SHOULD record a zero-width ``BLOCKED`` diagnostic row.
The return value tells callers whether the run may execute work.
:param AtomicServiceRun atomic_service_run: The claimed run identity.
:param datetime | None started_at: When the runner began its claim
attempt (UTC timezone-aware). Runtime callers may pass ``None`` so
the backend stamps the row at the storage admission point. For
``BLOCKED`` records this is also the ``end_time`` because no work
was performed.
:param AtomicServiceExecutionStatus status: Initial status of the
record. ``RUNNING`` for successful claims; ``BLOCKED`` for
recorded skip attempts.
:param str reason: Free-form diagnostic detail, surfaced in Pynmon.
:return: ``True`` when the execution is allowed to run work.
"""
[docs]
@abstractmethod
def finalize_atomic_service_execution(
self,
atomic_service_run: AtomicServiceRun,
end_time: datetime,
status: AtomicServiceExecutionStatus,
reason: str = "",
) -> None:
"""Transition a previously-inserted record to a terminal status.
Idempotent: backends may be called more than once for the same
``atomic_service_run`` (for example, the runner's normal
``COMPLETED`` finalisation racing the reaper's ``ABANDONED``
cleanup). The first terminal finalisation wins.
If no prior ``RUNNING`` record exists for this run (the start
write failed), implementations MUST insert one with
``start_time == end_time`` so the row is still visible.
:param AtomicServiceRun atomic_service_run: The claimed run identity.
:param datetime end_time: When execution ended (UTC).
:param AtomicServiceExecutionStatus status: Terminal status
(``COMPLETED`` / ``ABANDONED`` / ``BLOCKED``).
:param str reason: Diagnostic detail; empty for clean completion.
"""
[docs]
@abstractmethod
def get_active_atomic_service_executions(
self,
) -> list["AtomicServiceExecution"]:
"""Return every execution currently in ``RUNNING`` status.
Used by the claim flow to enforce mutual exclusion and by Pynmon
to surface in-flight runs. Ordered most-recently-started first.
"""
[docs]
@abstractmethod
def get_atomic_service_executions_in_timerange(
self,
start_time: datetime,
end_time: datetime,
limit: int = 1000,
*,
runner_id: str | None = None,
min_duration_seconds: float = 0.0,
) -> list["AtomicServiceExecution"]:
"""Retrieve atomic-service execution windows that overlap a time range."""
[docs]
@abstractmethod
def purge_atomic_service_executions(self) -> int:
"""Drop old / excess atomic-service execution records.
:return: Number of records purged.
"""