Source code for pynenc.orchestrator.base_orchestrator

from abc import ABC, abstractmethod
from collections.abc import Iterator
from datetime import datetime
from functools import cached_property
from time import time
from typing import TYPE_CHECKING, Any

from pynenc import context
from pynenc.conf.config_orchestrator import ConfigOrchestrator
from pynenc.conf.config_task import ConcurrencyControlType
from pynenc.exceptions import (
    InvocationConcurrencyWithDifferentArgumentsError,
    TaskParallelProcessingError,
    InvocationStatusError,
)
from pynenc.invocation.dist_invocation import DistributedInvocation, ReusedInvocation
from pynenc.invocation.status import InvocationStatus, InvocationStatusRecord
from pynenc.orchestrator.atomic_service import (
    AtomicServiceExecutionStatus,
    AtomicServiceRun,
    decide_atomic_service_claim,
)

if TYPE_CHECKING:
    from pynenc.app import Pynenc
    from pynenc.call import Call, PreSerializedCall, CallId
    from pynenc.identifiers.invocation_id import InvocationId
    from pynenc.orchestrator.atomic_service import (
        ActiveRunnerInfo,
        AtomicServiceExecution,
    )
    from pynenc.runner.runner_context import RunnerContext
    from pynenc.task import Task, TaskId
    from pynenc.types import Params, Result


[docs] class BaseBlockingControl(ABC): """ A component of the orchestrator to implement blocking control functionalities. This abstract base class defines the interface for managing blocking behavior in distributed task executions. """
[docs] @abstractmethod def release_waiters(self, waited_invocation_id: "InvocationId") -> None: """ Releases any invocations that are waiting on the specified invocation. :param InvocationId waited_invocation_id: The ID of the invocation that has finished and can release its waiters. """
[docs] @abstractmethod def waiting_for_results( self, caller_invocation_id: "InvocationId", result_invocation_ids: list["InvocationId"], ) -> None: """ Notifies the system that an invocation is waiting for the results of other invocations. :param InvocationId caller_invocation_id: The ID of the invocation that is waiting. :param list[InvocationId] result_invocation_ids: The IDs of the invocations being waited on. """
[docs] @abstractmethod def get_blocking_invocations( self, max_num_invocations: int ) -> Iterator["InvocationId"]: """ Retrieves invocation IDs that are blocking others but are not blocked themselves. :param int max_num_invocations: The maximum number of blocking invocation IDs to retrieve. :return: An iterator over unblocked, blocking invocation IDs, **ordered by age (oldest first)**. :rtype: Iterator[InvocationId] """
[docs] class BaseOrchestrator(ABC): """ Abstract base class defining the orchestrator's interface in a distributed task system. The orchestrator is responsible for managing task invocations, including tracking their status, handling retries, and implementing blocking control. .. note:: **Coupling with StateBackend** The orchestrator relies on ``app.state_backend`` for persisting invocation data, results, exceptions, and history. Methods such as ``set_invocation_status``, ``set_invocation_result``, and ``route_call`` delegate storage to the state backend. This coupling is intentional: the orchestrator owns lifecycle transitions while the state backend owns persistence. Future refactors may introduce an explicit interface to formalize this contract. :param Pynenc app: The Pynenc application instance. """ def __init__(self, app: "Pynenc") -> None: self.app = app @cached_property def conf(self) -> ConfigOrchestrator: return ConfigOrchestrator( config_values=self.app.config_values, config_filepath=self.app.config_filepath, )
[docs] @abstractmethod def _register_new_invocations( self, invocations: list["DistributedInvocation[Params, Result]"], runner_id: str | None = None, ) -> InvocationStatusRecord: """ Register new invocations with status Register if they don't exist yet. :param list[DistributedInvocation[Params, Result]] invocations: The invocations to be registered. :param str | None runner_id: The owner ID for ownership validation :return: The status record of the registered invocation. """
[docs] @abstractmethod def get_existing_invocations( self, task: "Task[Params, Result]", key_serialized_arguments: dict[str, str] | None = None, statuses: "list[InvocationStatus] | None" = None, ) -> Iterator["InvocationId"]: """ Retrieves existing invocation IDs based on task, arguments, and status. :param Task[Params, Result] task: The task for which to retrieve invocations. :param dict[str, str] | None key_serialized_arguments: Serialized arguments to filter invocations. :param list[InvocationStatus] | None statuses: The statuses to filter invocations. :return: An iterator over the matching invocation IDs. :rtype: Iterator[InvocationId] """
[docs] @abstractmethod def get_task_invocation_ids(self, task_id: "TaskId") -> Iterator["InvocationId"]: """ Retrieves all invocation IDs associated with a specific task ID. :param TaskId task_id: The task ID to filter invocations. :return: List of invocation IDs for the specified task. """
[docs] @abstractmethod def get_invocation_ids_paginated( self, task_id: "TaskId | None" = None, statuses: "list[InvocationStatus] | None" = None, limit: int = 100, offset: int = 0, ) -> list["InvocationId"]: """ Retrieves invocation IDs with pagination support. This method provides efficient pagination for large datasets by using LIMIT/OFFSET semantics. Results are ordered by registration time (newest first). :param TaskId | None task_id: Optional task ID to filter by. :param list[InvocationStatus] | None statuses: Optional statuses to filter by. :param int limit: Maximum number of results to return. :param int offset: Number of results to skip. :return: List of matching invocation IDs. """
[docs] @abstractmethod def count_invocations( self, task_id: "TaskId | None" = None, statuses: "list[InvocationStatus] | None" = None, ) -> int: """ Counts invocations matching the given filters. :param str | None task_id: Optional task ID to filter by. :param list[InvocationStatus] | None statuses: Optional statuses to filter by. :return: The total count of matching invocations. """
[docs] @abstractmethod def get_call_invocation_ids(self, call_id: "CallId") -> Iterator["InvocationId"]: """ Retrieves all invocation IDs associated with a specific call ID. :param CallId call_id: The call ID to filter invocations. :return: List of invocation IDs for the specified call. """
[docs] @abstractmethod def _atomic_status_transition( self, invocation_id: "InvocationId", status: InvocationStatus, runner_id: str | None = None, ) -> InvocationStatusRecord: """ Atomically validates and transitions invocation status. Backend implementations must: 1. Read current status record 2. Validate transition using status_record_transition() 3. Atomically update only if validation passes 4. Return the new status record All validation and state changes happen within a single atomic operation. :param InvocationId invocation_id: The ID of the invocation to update :param InvocationStatus status: The target status :param str | None runner_id: The owner ID for ownership validation :return: The new status record after successful transition :rtype: InvocationStatusRecord :raises InvocationStatusTransitionError: If transition is not allowed :raises InvocationStatusOwnershipError: If ownership rules are violated :raises KeyError: If invocation does not exist """
# Example implementation pattern (varies by backend): # # def _atomic_status_transition(self, invocation_id, status, runner_id): # # PostgreSQL with transaction: # with transaction: # current = get_invocation_status_record(invocation_id) # new_record = status_record_transition(current, status, runner_id) # UPDATE ... WHERE invocation_id = ? AND status = current.status # if not updated: raise race condition # return new_record # # # Redis with Lua script: # lua_script = """ # local current = get_status(invocation_id) # -- validate in Lua or return data for Python validation # if valid then set_status(new_status) end # """ # return execute_lua(lua_script) # # # MongoDB findAndModify: # current = find_one(invocation_id) # new_record = status_record_transition(current, status, runner_id) # result = find_and_modify( # query={id: invocation_id, status: current.status}, # update={status: new_record} # ) # if not result: raise race condition # return new_record
[docs] @abstractmethod def index_arguments_for_concurrency_control( self, invocation: "DistributedInvocation[Params, Result]", ) -> None: """ Caches the required data to implement concurrency control. :param DistributedInvocation[Params, Result] invocation: The invocation to be cached. """
[docs] @abstractmethod def set_up_invocation_auto_purge(self, invocation_id: "InvocationId") -> None: """ Sets up automatic purging for an invocation after a defined period. ```{note} Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours` ``` :param InvocationId invocation_id: The invocation to set up for auto purge. """
[docs] @abstractmethod def auto_purge(self) -> None: """ Automatically purges all invocations in a final state that are older than a defined time period. ```{note} Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours` ``` """
[docs] def get_invocation_status( self, invocation_id: "InvocationId" ) -> "InvocationStatus": """ Retrieves the status of a specific invocation id. :param InvocationId invocation_id: The id of the invocation whose status is to be retrieved. :return: The current status of the invocation. :rtype: InvocationStatus :raises KeyError: If the invocation ID does not exist. """ status_record = self.get_invocation_status_record(invocation_id) return status_record.status
[docs] @abstractmethod def get_invocation_status_record( self, invocation_id: "InvocationId" ) -> "InvocationStatusRecord": """ Retrieves the status record of a specific invocation id. :param InvocationId invocation_id: The id of the invocation whose status is to be retrieved. :return: The current status record of the invocation. :rtype: InvocationStatusRecord :raises KeyError: If the invocation ID does not exist. """
[docs] @abstractmethod def increment_invocation_retries(self, invocation_id: "InvocationId") -> None: """ Increments the retry count of a specific invocation. :param InvocationId invocation_id: The id of the invocation for which to increment retries. """
[docs] @abstractmethod def get_invocation_retries(self, invocation_id: "InvocationId") -> int: """ Retrieves the number of retries for a specific invocation. :param InvocationId invocation_id: The id of the invocation whose retry count is to be retrieved. :return: The number of retries for the invocation. :rtype: int """
[docs] @abstractmethod def filter_by_status( self, invocation_ids: list["InvocationId"], status_filter: frozenset["InvocationStatus"], ) -> list["InvocationId"]: """ Filters a list of invocation ids by their status in an optimized way. This method allows efficient batch filtering of invocations by status, reducing the number of individual status checks needed. :param list[InvocationId] invocations: The invocation ids to filter :param list[InvocationStatus] | None status_filter: The statuses to filter by. If None, defaults to final statuses. :return: List of invocation ids matching the status filter :rtype: list[InvocationId] """ pass
[docs] def filter_final( self, invocation_ids: list["InvocationId"] ) -> list["InvocationId"]: """ Returns invocation ids that have reached a final status. :param list[InvocationId] invocations: The invocation ids to check :return: List of invocation ids that have reached a final status :rtype: list[InvocationId] """ return self.filter_by_status( invocation_ids, InvocationStatus.get_final_statuses() )
[docs] @abstractmethod def purge(self) -> None: """ Purges all the orchestrator data for the current self.app.app_id. ```{important} This should only be used for testing purposes. ``` """
############################################# # blocking sub functionalities @property @abstractmethod def blocking_control(self) -> BaseBlockingControl: """ Property to access the blocking control component of the orchestrator. :return: The blocking control component. :rtype: BaseBlockingControl """
[docs] def release_waiters(self, waited_id: "InvocationId") -> None: """ Releases other invocations that are waiting on the completion of the specified invocation. :param InvocationId waited_id: The ID of the invocation that has completed. """ if self.app.orchestrator.conf.blocking_control: self.blocking_control.release_waiters(waited_id)
[docs] def waiting_for_results( self, caller_invocation_id: "InvocationId | None", result_invocation_ids: list["InvocationId"], ) -> None: """ Notifies the system that an invocation is waiting for the results of other invocations. :param InvocationId | None caller_invocation_id: The ID of the waiting invocation. :param list[InvocationId] result_invocation_ids: The IDs of the invocations being waited on. """ if not result_invocation_ids: self.app.logger.warning( f"Unnecessary call to waiting_for_results, invocation:{caller_invocation_id} is not waiting for any results" ) return if self.app.orchestrator.conf.blocking_control and caller_invocation_id: self.blocking_control.waiting_for_results( caller_invocation_id, result_invocation_ids )
[docs] def get_blocking_invocations( self, max_num_invocation_ids: int ) -> Iterator["InvocationId"]: """ Retrieves invocation IDs that are blocking others but are not blocked themselves. :param int max_num_invocation_ids: The maximum number of blocking invocation IDs to retrieve. :return: An iterator over unblocked, blocking invocation IDs. :rtype: Iterator[InvocationId] ```{note} The order of the returned invocation IDs is **oldest first**. ``` """ if self.app.orchestrator.conf.blocking_control: yield from self.blocking_control.get_blocking_invocations( max_num_invocation_ids )
#############################################
[docs] def set_invocation_status( self, invocation_id: "InvocationId", status: "InvocationStatus", runner_ctx: "RunnerContext", ) -> None: """ Sets the status of a specific invocation. :param InvocationId invocation_id: The ID of the invocation to update. :param InvocationStatus status: The new status to set for the invocation. :raises InvocationStatusTransitionError: If transition is not allowed :raises InvocationStatusOwnershipError: If ownership rules are violated :raises InvocationStatusRaceConditionError: If a race condition is detected during status update """ new_status_record = self._atomic_status_transition( invocation_id, status, runner_ctx.runner_id ) if status.is_final(): self.release_waiters(invocation_id) self.set_up_invocation_auto_purge(invocation_id) self.app.state_backend.add_history(invocation_id, new_status_record, runner_ctx) self.app.trigger.report_tasks_status([invocation_id], status) self.app.logger.info(f"invocation:{invocation_id} status:{status.value}")
[docs] def set_invocation_result( self, invocation: "DistributedInvocation", result: Any, runner_ctx: "RunnerContext", ) -> None: """ Sets the result for a completed invocation. :param DistributedInvocation invocation: The invocation that has completed. :param Any result: The result of the invocation's execution. """ self.app.state_backend.set_result(invocation.invocation_id, result) self.app.orchestrator.set_invocation_status( invocation.invocation_id, InvocationStatus.SUCCESS, runner_ctx ) self.app.trigger.report_invocation_result(invocation, result)
[docs] def set_invocation_exception( self, invocation: "DistributedInvocation", exception: Exception, runner_ctx: "RunnerContext", ) -> None: """ Sets an exception for an invocation that finished with an error. :param DistributedInvocation invocation: The invocation that encountered an exception. :param Exception exception: The exception that occurred during the invocation's execution. """ self.app.state_backend.set_exception(invocation.invocation_id, exception) # TODO! on previous fail, this should still change status # eg. on case of interruption from a kubernetes pod (SIGTERM, SIGKILL) # it should try to finish all the calls in this function self.app.orchestrator.set_invocation_status( invocation.invocation_id, InvocationStatus.FAILED, runner_ctx ) self.app.trigger.report_invocation_failure(invocation, exception)
[docs] def set_invocation_retry( self, invocation_id: "InvocationId", exception: Exception, runner_ctx: "RunnerContext", ) -> None: """ Sets an invocation for retry in case of a retriable exception. :param InvocationId invocation_id: The ID of the invocation to be retried. :param Exception exception: The exception that triggered the retry. """ # TODO! on previous fail, this should still change status # eg. on case of interruption from a kubernetes pod (SIGTERM, SIGKILL) # it should try to finish all the calls in this function # TODO store Retry exception on Retry status self.app.orchestrator.set_invocation_status( invocation_id, InvocationStatus.RETRY, runner_ctx ) self.app.orchestrator.increment_invocation_retries(invocation_id) self.app.broker.route_invocation(invocation_id)
[docs] def is_candidate_to_run_by_concurrency_control( self, invocation: "DistributedInvocation[Params, Result]" ) -> bool: """ Checks if an invocation can be candidate to run based on the task's concurrency control configuration. :param DistributedInvocation invocation: The invocation to check for authorization. :return: True if the invocation is authorized to be a running candidate, False otherwise. """ return self._is_authorize_by_concurrency_control( invocation, [InvocationStatus.PENDING, InvocationStatus.RUNNING] )
[docs] def is_authorize_to_run_by_concurrency_control( self, invocation: "DistributedInvocation[Params, Result]" ) -> bool: """ Checks if an invocation can be candidate to run based on the task's concurrency control configuration. :param DistributedInvocation invocation: The invocation to check for authorization. :return: True if the invocation is authorized to be a running candidate, False otherwise. """ return self._is_authorize_by_concurrency_control( invocation, [InvocationStatus.RUNNING] )
[docs] def _is_authorize_by_concurrency_control( self, invocation: "DistributedInvocation[Params, Result]", statuses: list["InvocationStatus"], ) -> bool: """ Checks if an invocation is authorized to run based on the task's concurrency control configuration. ```{important} The authorization is determined by the task's running_concurrency setting: - If ConcurrencyControlType.DISABLED, the invocation is always authorized to run. - If ConcurrencyControlType.TASK, it checks if there are any other running invocations of the same task. If there are, the invocation is not authorized to run. - If ConcurrencyControlType.ARGUMENTS, it checks for any running invocation of the same task with the same arguments. If there are, the invocation is not authorized to run. - If ConcurrencyControlType.KEYS, it checks for any running invocation with the same (key) arguments. If any are found, the invocation is not authorized to run. ``` ```{note} The function call.serialized_args_for_concurrency_control is used to determine the arguments that are relevant for checking existing running invocations based on the task's running_concurrency option. ``` :param DistributedInvocation invocation: The invocation to check for authorization. :param list[InvocationStatus] statuses: The statuses to check for existing invocations. :return: True if the invocation is authorized, False otherwise. """ if invocation.task.conf.running_concurrency == ConcurrencyControlType.DISABLED: return True running_invocation = next( self.get_existing_invocations( task=invocation.call.task, key_serialized_arguments=invocation.call.serialized_args_for_concurrency_control( invocation.task.conf.running_concurrency ), statuses=statuses, ), None, ) if not running_invocation: return True invocation.task.logger.info( f"invocation:{invocation.invocation_id} deferred by concurrency_control: " f"invocation:{running_invocation} already in status:{statuses} " f"(concurrency_control={invocation.task.conf.running_concurrency.value})" ) return False
[docs] def get_blocking_invocations_to_run( self, max_num_invocations: int, blocking_invocation_ids: set["InvocationId"], runner_ctx: "RunnerContext", ) -> Iterator["InvocationId"]: """ Retrieves invocation IDs that are blocking others but are not themselves blocked, up to a maximum number. :param int max_num_invocations: The maximum number of blocking invocations to retrieve. :param set["InvocationId"] blocking_invocation_ids: A set of invocation IDs that are already identified as blocking. :return: An iterator over the blocking invocation IDs. :rtype: Iterator["InvocationId"] """ for blocking_invocation_id in self.get_blocking_invocations( max_num_invocations ): current_status = self.get_invocation_status(blocking_invocation_id) # Skip silently if the invocation is already past the point where PENDING makes sense # (e.g. already RUNNING, SUCCESS, FAILED) — another worker already claimed it if not current_status.can_transition_to(InvocationStatus.PENDING): continue blocking_invocation = self.app.state_backend.get_invocation( blocking_invocation_id ) if not self.is_candidate_to_run_by_concurrency_control(blocking_invocation): continue try: self.set_invocation_status( blocking_invocation_id, InvocationStatus.PENDING, runner_ctx ) blocking_invocation_ids.add(blocking_invocation_id) yield blocking_invocation_id except InvocationStatusError as ex: # Race condition: another worker claimed it between our status check and transition self.app.logger.debug( f"Could not set blocking invocation:{blocking_invocation_id} to status:pending: {ex}" ) continue
[docs] def get_additional_invocations_to_run( self, missing_invocations: int, blocking_invocation_ids: set["InvocationId"], invocations_to_reroute: set["InvocationId"], runner_ctx: "RunnerContext", ) -> Iterator["DistributedInvocation"]: """ Retrieves additional invocations to run, considering those not blocked or already identified as blocking. :param int missing_invocations: The number of additional invocations needed. :param set["InvocationId"] blocking_invocation_ids: IDs of invocations already identified as blocking. :param set["InvocationId"] invocations_to_reroute: A set to collect invocations that need rerouting. :return: An iterator over the additional invocations to run. :rtype: Iterator["DistributedInvocation"] """ while missing_invocations > 0: if invocation_id := self.app.broker.retrieve_invocation(): if invocation_id not in blocking_invocation_ids: invocation_status = self.get_invocation_status(invocation_id) if invocation_status.is_available_for_run(): invocation = self.app.state_backend.get_invocation( invocation_id ) if not self.is_candidate_to_run_by_concurrency_control( invocation ): if invocation.task.conf.reroute_on_concurrency_control: self.set_invocation_status( invocation_id, InvocationStatus.CONCURRENCY_CONTROLLED, runner_ctx, ) invocations_to_reroute.add(invocation_id) else: self.set_invocation_status( invocation_id, InvocationStatus.CONCURRENCY_CONTROLLED_FINAL, runner_ctx, ) continue try: self.set_invocation_status( invocation_id, InvocationStatus.PENDING, runner_ctx, ) missing_invocations -= 1 yield invocation except InvocationStatusError as ex: self.app.logger.warning( f"Could not set invocation:{invocation_id} to status:pending: {ex}" ) continue else: break
[docs] def reroute_invocations( self, invocations_to_reroute: set["InvocationId"], runner_ctx: "RunnerContext", ) -> None: """ Reroutes the specified invocations , typically when they are not authorized to run. :param set["InvocationId"] invocations_to_reroute: The invocations to be rerouted. """ for invocation_id in invocations_to_reroute: self.set_invocation_status( invocation_id, InvocationStatus.REROUTED, runner_ctx ) self.app.broker.route_invocation(invocation_id)
[docs] def get_invocations_to_run( self, max_num_invocations: int, runner_ctx: "RunnerContext" ) -> Iterator["DistributedInvocation"]: """ Retrieves a set of invocations to run, considering blocking and concurrency control. :param int max_num_invocations: The maximum number of invocations to retrieve. :return: An iterator over invocations that are ready to run. :rtype: Iterator["DistributedInvocation"] """ blocking_invocation_ids: set[InvocationId] = set() # Get blocking invocations as IDs but still need to yield actual invocations for blocking_invocation_id in self.get_blocking_invocations_to_run( max_num_invocations, blocking_invocation_ids, runner_ctx ): if invocation := self.app.state_backend.get_invocation( blocking_invocation_id ): yield invocation invocations_to_reroute: set[InvocationId] = set() missing_invocations = max_num_invocations - len(blocking_invocation_ids) yield from self.get_additional_invocations_to_run( missing_invocations, blocking_invocation_ids, invocations_to_reroute, runner_ctx, ) self.reroute_invocations(invocations_to_reroute, runner_ctx)
[docs] def register_new_invocations( self, invocations: list["DistributedInvocation[Params, Result]"] ) -> None: """ Registers a new invocation in the state backend and the orchestrator. :param DistributedInvocation[Params, Result] invocation: The invocation to register. :param str | None runner_id: The owner ID for ownership validation """ runner_ctx = context.get_or_create_runner_context(self.app.app_id) runner_id = runner_ctx.runner_id self.app.state_backend.upsert_invocations(invocations) # This should add the status registered to the backend status_record = self._register_new_invocations(invocations, runner_id) inv_ids = [invocation.invocation_id for invocation in invocations] self.app.state_backend.add_histories(invocations, status_record, runner_ctx) self.app.trigger.report_tasks_status(inv_ids, status_record.status) self.app.broker.route_invocations(inv_ids) for invocation in invocations: task_key = invocation.call.task.task_id.key inv_id = invocation.invocation_id self.app.logger.info( f"NEW task:{task_key} invocation:{inv_id} REGISTERED and ROUTED" )
[docs] def _route_new_call_invocation( self, call: "Call[Params, Result]", runner_id: str | None = None ) -> "DistributedInvocation[Params, Result]": """ Routes a new call invocation within the distributed task system. This method creates and routes a new `DistributedInvocation` for the provided call. It is primarily used when the task does not have single invocation options set. :param Call[Params, Result] call: The task call to be routed. :return: The newly created `DistributedInvocation` for the call. :rtype: DistributedInvocation[Params, Result] """ new_invocation = DistributedInvocation.from_parent( call, parent_invocation=context.get_dist_invocation_context(self.app.app_id), parent_event_id=context.get_trigger_event_context(self.app.app_id), ) self.register_new_invocations([new_invocation]) if ( call.task.conf.registration_concurrency != ConcurrencyControlType.DISABLED or call.task.conf.running_concurrency != ConcurrencyControlType.DISABLED ): self.index_arguments_for_concurrency_control(new_invocation) self.app.logger.info(f"invocation:{new_invocation.invocation_id} ROUTED") return new_invocation
[docs] def route_call(self, call: "Call") -> "DistributedInvocation": """ Routes a task call in the distributed task system, considering single invocation options. This method handles the routing of a task call. ```{important} Note the different behavior depending on the task's registration_concurrency option. A task is Registered when it is created but not yet Running, preventing over-queueing: - If ConcurrencyControlType.DISABLED, It always creates a new invocation. - If ConcurrencyControlType.TASK, It checks for any existing invocation of the same task regardless the arguments. If any is found, it reuses it, otherwise it creates a new invocation. - If ConcurrencyControlType.ARGUMENTS, It checks for any existing invocation with the same arguments. If any is found, it reuses it, otherwise it creates a new invocation. - If ConcurrencyControlType.KEYS, It checks for any existing invocation with the same key arguments. If any is found and the non-key arguments are the same, it always reuses it, IF the non-key arguments are differents and on_diff_non_key_args_raise is set to True, it raises an error, otherwise it reuses it with the new non-key arguments. If no invocation is found, it creates a new invocation. ``` ```{note} The function call.serialized_args_for_concurrency_control is used to get the arguments that are used to check for existing invocations based on the task's registration_concurrency option. ``` :param Call call: The task call to be routed. :return: A `DistributedInvocation` object, which could be either a new or reused invocation. :rtype: DistributedInvocation :raises InvocationConcurrencyWithDifferentArgumentsError: If an invocation with different arguments exists and the task's configuration specifies to raise an error in such cases. """ if call.task.conf.registration_concurrency == ConcurrencyControlType.DISABLED: call.task.logger.debug(f"New invocation for call {call}") return self._route_new_call_invocation(call) # Handleling registred task concurrency control invocation_id = next( self.get_existing_invocations( task=call.task, # we filter here by TASK, all Arguments, or defined keys based in the config # TODO Make it explicit key_serialized_arguments=call.serialized_args_for_concurrency_control( call.task.conf.registration_concurrency ), statuses=[InvocationStatus.REGISTERED], ), None, ) if not invocation_id: call.task.logger.debug( f"No matching registered invocation found for {call.task} " f"and key args {call.serialized_args_for_concurrency_control(call.task.conf.registration_concurrency)}" ) return self._route_new_call_invocation(call) invocation = self.app.state_backend.get_invocation(invocation_id) if not invocation: call.task.logger.warning( f"invocation:{invocation_id} not found in state backend for call {call}, routing new invocation" ) return self._route_new_call_invocation(call) if invocation.call.call_id == call.call_id: call.task.logger.debug( f"Reusing invocation:{invocation.invocation_id} for call {call}" ) return ReusedInvocation(invocation) if call.task.conf.on_diff_non_key_args_raise: raise InvocationConcurrencyWithDifferentArgumentsError.from_call_mismatch( existing_invocation=invocation, new_call=call ) # TODO: review this code, we are reusing an invocation with different non-key arguments, should we still reuse it? return ReusedInvocation(invocation, call.arguments)
[docs] def route_calls( self, calls: list["PreSerializedCall[Params, Result]"] ) -> list["DistributedInvocation[Params, Result]"]: """ Routes multiple calls at once for improved performance. This method is specifically for batch processing tasks with disabled concurrency control. :param list[PreSerializedCall[Params, Result]] calls: The calls to be routed. :return: The list of routed invocations. :rtype: list[DistributedInvocation[Params, Result]] :raises TaskParallelProcessingError: If concurrency control is enabled for any of the calls. """ if not calls: self.app.logger.debug("No calls to route") return [] if ( calls[0].task.conf.registration_concurrency != ConcurrencyControlType.DISABLED ): raise TaskParallelProcessingError( calls[0].task.task_id, "Batch processing is only supported with ConcurrencyControlType.DISABLED", ) parent_invocation = context.get_dist_invocation_context(self.app.app_id) invocations: list[DistributedInvocation[Params, Result]] = [ DistributedInvocation.from_parent(call, parent_invocation) for call in calls ] self.register_new_invocations(invocations) return invocations
[docs] @abstractmethod def register_runner_heartbeats( self, runner_ids: list[str], can_run_atomic_service: bool = False, ) -> None: """ Register or update heartbeat timestamps for one or more runners. This unified method handles both: - Parent/atomic service runners registering themselves (can_run_atomic_service=True) - Worker runners registering themselves (can_run_atomic_service=False, default) - Parent runners reporting their children's heartbeats (can_run_atomic_service=False) For runners that already exist, only updates the heartbeat timestamp. For new runners, creates a new record with the given atomic service eligibility. :param list[str] runner_ids: List of runner_ids to register/update heartbeats for. :param bool can_run_atomic_service: Whether these runners are eligible to run atomic services. """
[docs] @abstractmethod def _get_active_runners( self, timeout_seconds: float, can_run_atomic_service: bool | None ) -> list["ActiveRunnerInfo"]: """ Retrieve runners that are considered active based on heartbeat activity. A runner is considered "active" if it has sent a heartbeat within the timeout period. Implementations must return runners ordered by ``creation_time`` ascending, then ``runner_id`` ascending. Atomic-service slots are calculated from that exact order, so every runner must observe the same order for the same backend snapshot. Runners registered after a snapshot are naturally considered only in later queries, and later creation times place them after existing runners. :param float timeout_seconds: Heartbeat timeout in seconds (typically from runner_considered_dead_after_minutes config) :param bool | None can_run_atomic_service: If specified, filters runners based on their eligibility to run atomic services :return: List of active runners ordered by creation time, then runner ID :rtype: list["ActiveRunnerInfo"] """
[docs] def get_active_runners( self, can_run_atomic_service: bool | None = None ) -> list["ActiveRunnerInfo"]: """ Retrieve runners that are considered active based on heartbeat activity. A runner is considered "active" if it has sent a heartbeat within the timeout period. Runners are returned in the deterministic order required by ``_get_active_runners``: creation time ascending, then runner ID ascending. :param bool | None can_run_atomic_service: If specified, filters runners based on their eligibility to run atomic services :return: List of active runners ordered by creation time, then runner ID :rtype: list["ActiveRunnerInfo"] """ timeout_seconds = self.app.conf.runner_considered_dead_after_minutes * 60 return self._get_active_runners(timeout_seconds, can_run_atomic_service)
[docs] @abstractmethod def get_pending_invocations_for_recovery(self) -> Iterator["InvocationId"]: """ Retrieve invocation IDs stuck in PENDING status beyond the allowed time. :return: Iterator of invocation IDs that need recovery. :rtype: Iterator["InvocationId"] """
[docs] @abstractmethod def _get_running_invocations_for_recovery( self, timeout_seconds: float ) -> Iterator["InvocationId"]: """ Retrieve invocation IDs in RUNNING status owned by inactive runners. An inactive runner is one that hasn't sent a heartbeat within the configured timeout period. Invocations owned by such runners are considered stuck and need recovery. :param float timeout_seconds: Heartbeat timeout in seconds :return: Iterator of invocation IDs that need recovery. :rtype: Iterator["InvocationId"] """
[docs] def get_running_invocations_for_recovery(self) -> Iterator["InvocationId"]: """ Retrieve invocation IDs in RUNNING status owned by inactive runners. An inactive runner is one that hasn't sent a heartbeat within the configured timeout period. Invocations owned by such runners are considered stuck and need recovery. :param float timeout_seconds: Heartbeat timeout in seconds :return: Iterator of invocation IDs that need recovery. :rtype: Iterator["InvocationId"] """ timeout_seconds = self.app.conf.runner_considered_dead_after_minutes * 60 return self._get_running_invocations_for_recovery(timeout_seconds)
[docs] def try_claim_atomic_service_run( self, runner_ctx: "RunnerContext", ) -> AtomicServiceRun | None: """Return a claimed run, or ``None`` if this runner does not own the slot. ``decide_atomic_service_claim`` selects the runner that owns the current slot. The backend claim write is still responsible for cluster-wide mutual exclusion because runners can observe slightly different membership snapshots while workers are starting/stopping. """ conf = self.app.conf self.register_runner_heartbeats( [runner_ctx.runner_id], can_run_atomic_service=True ) now = time() claim = decide_atomic_service_claim( runner_id=runner_ctx.runner_id, active_runners=self.get_active_runners(can_run_atomic_service=True), current_time=now, service_interval_minutes=conf.atomic_service_interval_minutes, spread_margin_minutes=conf.atomic_service_spread_margin_minutes, membership_stabilization_seconds=( float(conf.atomic_service_membership_stabilization_minutes) * 60.0 ), max_start_slot_fraction=float(conf.atomic_service_max_start_slot_fraction), ) if (run := claim.atomic_service_run) is None: return None if not self.record_atomic_service_execution_start(run, None): return None return run
[docs] @abstractmethod def record_atomic_service_execution_start( self, atomic_service_run: AtomicServiceRun, started_at: datetime | None, status: AtomicServiceExecutionStatus = AtomicServiceExecutionStatus.RUNNING, reason: str = "", ) -> bool: """Insert a new atomic-service execution record. Implementations MUST store ``status`` (defaulting to ``RUNNING``) and the free-form ``reason`` so that subsequent backend queries can decide whether the slot is still being held. For the default ``RUNNING`` status this method is the storage-level atomic gate: at most one active execution may be recorded. If another execution is already active, implementations MUST NOT leave this run in ``RUNNING`` and SHOULD record a zero-width ``BLOCKED`` diagnostic row. The return value tells callers whether the run may execute work. :param AtomicServiceRun atomic_service_run: The claimed run identity. :param datetime | None started_at: When the runner began its claim attempt (UTC timezone-aware). Runtime callers may pass ``None`` so the backend stamps the row at the storage admission point. For ``BLOCKED`` records this is also the ``end_time`` because no work was performed. :param AtomicServiceExecutionStatus status: Initial status of the record. ``RUNNING`` for successful claims; ``BLOCKED`` for recorded skip attempts. :param str reason: Free-form diagnostic detail, surfaced in Pynmon. :return: ``True`` when the execution is allowed to run work. """
[docs] @abstractmethod def finalize_atomic_service_execution( self, atomic_service_run: AtomicServiceRun, end_time: datetime, status: AtomicServiceExecutionStatus, reason: str = "", ) -> None: """Transition a previously-inserted record to a terminal status. Idempotent: backends may be called more than once for the same ``atomic_service_run`` (for example, the runner's normal ``COMPLETED`` finalisation racing the reaper's ``ABANDONED`` cleanup). The first terminal finalisation wins. If no prior ``RUNNING`` record exists for this run (the start write failed), implementations MUST insert one with ``start_time == end_time`` so the row is still visible. :param AtomicServiceRun atomic_service_run: The claimed run identity. :param datetime end_time: When execution ended (UTC). :param AtomicServiceExecutionStatus status: Terminal status (``COMPLETED`` / ``ABANDONED`` / ``BLOCKED``). :param str reason: Diagnostic detail; empty for clean completion. """
[docs] @abstractmethod def get_active_atomic_service_executions( self, ) -> list["AtomicServiceExecution"]: """Return every execution currently in ``RUNNING`` status. Used by the claim flow to enforce mutual exclusion and by Pynmon to surface in-flight runs. Ordered most-recently-started first. """
[docs] @abstractmethod def get_atomic_service_executions_in_timerange( self, start_time: datetime, end_time: datetime, limit: int = 1000, *, runner_id: str | None = None, min_duration_seconds: float = 0.0, ) -> list["AtomicServiceExecution"]: """Retrieve atomic-service execution windows that overlap a time range."""
[docs] @abstractmethod def purge_atomic_service_executions(self) -> int: """Drop old / excess atomic-service execution records. :return: Number of records purged. """