Source code for pynenc.orchestrator.base_orchestrator

from abc import ABC, abstractmethod
from functools import cached_property
from typing import TYPE_CHECKING, Any, Iterator, Optional

from pynenc import context
from pynenc.conf.config_orchestrator import ConfigOrchestrator
from pynenc.conf.config_task import ConcurrencyControlType
from pynenc.exceptions import (
    InvocationConcurrencyWithDifferentArgumentsError,
    PendingInvocationLockError,
    TaskParallelProcessingError,
)
from pynenc.invocation.dist_invocation import DistributedInvocation, ReusedInvocation
from pynenc.invocation.status import InvocationStatus

if TYPE_CHECKING:
    from pynenc.app import Pynenc
    from pynenc.call import Call, PreSerializedCall
    from pynenc.task import Task
    from pynenc.types import Params, Result


[docs] class BaseCycleControl(ABC): """ A component of the orchestrator to implement cycle control functionalities. This abstract base class defines the interface for cycle control in a distributed task system. It is intended to prevent the formation of call cycles between tasks. """
[docs] @abstractmethod def add_call_and_check_cycles( self, caller_invocation: "DistributedInvocation[Params, Result]", callee_invocation: "DistributedInvocation[Params, Result]", ) -> None: """ Adds a new call relationship between invocations and checks for potential cycles. :param DistributedInvocation[Params, Result] caller_invocation: The invocation calling another task. :param DistributedInvocation[Params, Result] callee_invocation: The invocation being called. :raises CycleDetectedError: If adding the call creates a cycle. """
# TODO async store of call dependencies in state backend
[docs] @abstractmethod def clean_up_invocation_cycles(self, invocation: "DistributedInvocation") -> None: """ Cleans up any cycle-related data when an invocation is finished. :param DistributedInvocation invocation: The invocation that has finished. """
[docs] class BaseBlockingControl(ABC): """ A component of the orchestrator to implement blocking control functionalities. This abstract base class defines the interface for managing blocking behavior in distributed task executions. """
[docs] @abstractmethod def release_waiters(self, waited: "DistributedInvocation") -> None: """ Releases any invocations that are waiting on the specified invocation. :param DistributedInvocation waited: The invocation that has finished and can release its waiters. """
[docs] @abstractmethod def waiting_for_results( self, caller_invocation: "DistributedInvocation[Params, Result]", result_invocations: list["DistributedInvocation[Params, Result]"], ) -> None: """ Notifies the system that an invocation is waiting for the results of other invocations. :param DistributedInvocation[Params, Result] caller_invocation: The invocation that is waiting. :param list[DistributedInvocation[Params, Result]] result_invocations: The invocations being waited on. """
[docs] @abstractmethod def get_blocking_invocations( self, max_num_invocations: int ) -> Iterator["DistributedInvocation"]: """ Retrieves invocations that are blocking others but are not blocked themselves. :param int max_num_invocations: The maximum number of blocking invocations to retrieve. :return: An iterator over unblocked, blocking invocations, **ordered by age (oldest first)**. :rtype: Iterator[DistributedInvocation] """
[docs] class BaseOrchestrator(ABC): """ Abstract base class defining the orchestrator's interface in a distributed task system. The orchestrator is responsible for managing task invocations, including tracking their status, handling retries, and implementing cycle and blocking controls. :param Pynenc app: The Pynenc application instance. """ def __init__(self, app: "Pynenc") -> None: self.app = app @cached_property def conf(self) -> ConfigOrchestrator: return ConfigOrchestrator( config_values=self.app.config_values, config_filepath=self.app.config_filepath, )
[docs] @abstractmethod def get_existing_invocations( self, task: "Task[Params, Result]", key_serialized_arguments: Optional[dict[str, str]] = None, statuses: Optional[list["InvocationStatus"]] = None, ) -> Iterator["DistributedInvocation"]: """ Retrieves existing invocations based on task, arguments, and status. :param Task[Params, Result] task: The task for which to retrieve invocations. :param Optional[dict[str, str]] key_serialized_arguments: Serialized arguments to filter invocations. :param Optional[list[InvocationStatus]] status: The status to filter invocations. :return: An iterator over the matching invocations. :rtype: Iterator[DistributedInvocation] """
[docs] @abstractmethod def get_invocation(self, invocation_id: str) -> Optional["DistributedInvocation"]: """ Retrieves a specific invocation by its ID. This method provides a direct way to access an invocation without filtering through all invocations, which can be much more efficient when the invocation ID is known. :param str invocation_id: The ID of the invocation to retrieve. :return: The invocation if found, None otherwise. :rtype: Optional[DistributedInvocation] """
[docs] @abstractmethod def _set_invocation_status( self, invocation: "DistributedInvocation[Params, Result]", status: "InvocationStatus", ) -> None: """ Sets the status of a specific invocation. :param DistributedInvocation[Params, Result] invocation: The invocation to update. :param InvocationStatus status: The new status to set for the invocation. """
[docs] @abstractmethod def _set_invocations_status( self, invocations: list[DistributedInvocation], status: InvocationStatus ) -> None: """ Set the status of multiple invocations at once. Default implementation sets status for each invocation sequentially. Subclasses should override this with more efficient batch implementations. :param list[DistributedInvocation] invocations: The invocations to update. :param InvocationStatus status: The status to set. """
[docs] @abstractmethod def _set_invocation_pending_status( self, invocation: "DistributedInvocation[Params, Result]", ) -> None: """ Sets the status of an invocation to pending. ```{note} Pending can only be set by the orchestrator ``` :param DistributedInvocation[Params, Result] invocation: The invocation to update. """
[docs] def _set_pending( self, invocation: "DistributedInvocation[Params, Result]", ) -> None: """ Marks an invocation as pending and updates its history in the state backend. ```{note} Pending can only be set by the orchestrator ``` :param DistributedInvocation[Params, Result] invocation: The invocation to mark as pending. """ self._set_invocation_pending_status(invocation) self.app.state_backend.add_history(invocation, status=InvocationStatus.PENDING)
[docs] @abstractmethod def set_up_invocation_auto_purge( self, invocation: "DistributedInvocation[Params, Result]", ) -> None: """ Sets up automatic purging for an invocation after a defined period. ```{note} Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours` ``` :param DistributedInvocation[Params, Result] invocation: The invocation to set up for auto purge. """
[docs] @abstractmethod def auto_purge(self) -> None: """ Automatically purges all invocations in a final state that are older than a defined time period. ```{note} Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours` ``` """
[docs] @abstractmethod def get_invocation_status( self, invocation: "DistributedInvocation[Params, Result]" ) -> "InvocationStatus": """ Retrieves the status of a specific invocation. :param DistributedInvocation[Params, Result] invocation: The invocation whose status is to be retrieved. :return: The current status of the invocation. :rtype: InvocationStatus """
# TODO if invocation does not exists try in state backend (and cache it up) # before raising an exception!!!
[docs] @abstractmethod def increment_invocation_retries( self, invocation: "DistributedInvocation[Params, Result]" ) -> None: """ Increments the retry count of a specific invocation. :param DistributedInvocation[Params, Result] invocation: The invocation for which to increment retries. """
[docs] @abstractmethod def get_invocation_retries( self, invocation: "DistributedInvocation[Params, Result]" ) -> int: """ Retrieves the number of retries for a specific invocation. :param DistributedInvocation[Params, Result] invocation: The invocation whose retry count is to be retrieved. :return: The number of retries for the invocation. :rtype: int """
[docs] @abstractmethod def filter_by_status( self, invocations: list["DistributedInvocation"], status_filter: set["InvocationStatus"] | None = None, ) -> list["DistributedInvocation"]: """ Filters a list of invocations by their status in an optimized way. This method allows efficient batch filtering of invocations by status, reducing the number of individual status checks needed. :param list[DistributedInvocation] invocations: The invocations to filter :param list[InvocationStatus] | None status_filter: The statuses to filter by. If None, defaults to final statuses. :return: List of invocations matching the status filter :rtype: list[DistributedInvocation] """ pass
[docs] def filter_final( self, invocations: list["DistributedInvocation"] ) -> list["DistributedInvocation"]: """ Returns invocations that have reached a final status. This is a convenience method that internally uses filter_by_status with all final statuses. :param list[DistributedInvocation] invocations: The invocations to check :return: List of invocations that have reached a final status :rtype: list[DistributedInvocation] """ return self.filter_by_status(invocations, InvocationStatus.get_final_statuses())
[docs] @abstractmethod def purge(self) -> None: """ Purges all the orchestrator data for the current self.app.app_id. ```{important} This should only be used for testing purposes. ``` """
############################################# # cycle sub functionalities @property @abstractmethod def cycle_control(self) -> BaseCycleControl: """ Property to access the cycle control component of the orchestrator. :return: The cycle control component. :rtype: BaseCycleControl """
[docs] def add_call_and_check_cycles( self, caller_invocation: "DistributedInvocation[Params, Result]", callee_invocation: "DistributedInvocation[Params, Result]", ) -> None: """ Adds a call relationship between two invocations and checks for potential cycles. :param DistributedInvocation[Params, Result] caller_invocation: The calling invocation. :param DistributedInvocation[Params, Result] callee_invocation: The called invocation. """ if self.conf.cycle_control: self.cycle_control.add_call_and_check_cycles( caller_invocation, callee_invocation )
# TODO async store of call dependencies in state backend
[docs] def clean_up_invocation_cycles(self, invocation: "DistributedInvocation") -> None: """ Cleans up data related to invocation cycles when an invocation finishes. :param DistributedInvocation invocation: The invocation that has finished. """ if self.conf.cycle_control: self.cycle_control.clean_up_invocation_cycles(invocation)
############################################# # blocking sub functionalities @property @abstractmethod def blocking_control(self) -> BaseBlockingControl: """ Property to access the blocking control component of the orchestrator. :return: The blocking control component. :rtype: BaseBlockingControl """
[docs] def release_waiters(self, waited: "DistributedInvocation") -> None: """ Releases other invocations that are waiting on the completion of the specified invocation. :param DistributedInvocation waited: The invocation that has completed. """ if self.app.orchestrator.conf.blocking_control: self.blocking_control.release_waiters(waited)
[docs] def waiting_for_results( self, caller_invocation: Optional["DistributedInvocation[Params, Result]"], result_invocations: list["DistributedInvocation[Params, Result]"], ) -> None: """ Notifies the system that an invocation is waiting on the results of other invocations. :param Optional[DistributedInvocation[Params, Result]] caller_invocation: The waiting invocation. :param list[DistributedInvocation[Params, Result]] result_invocations: The invocations being waited on. """ if not result_invocations: self.app.logger.warning( f"Unnecessary call to waiting_for_results, {caller_invocation=} is not waiting for any results" ) return if self.app.orchestrator.conf.blocking_control and caller_invocation: self.blocking_control.waiting_for_results( caller_invocation, result_invocations )
[docs] def get_blocking_invocations( self, max_num_invocations: int ) -> Iterator["DistributedInvocation"]: """ Retrieves invocations that are blocking others but not blocked themselves. :param int max_num_invocations: The maximum number of blocking invocations to retrieve. :return: An iterator over unblocked, blocking invocations. :rtype: Iterator[DistributedInvocation] ```{note} The order of the returned invocations is **oldest first**. ``` """ if self.app.orchestrator.conf.blocking_control: yield from self.blocking_control.get_blocking_invocations( max_num_invocations )
#############################################
[docs] def set_invocation_status( self, invocation: "DistributedInvocation[Params, Result]", status: "InvocationStatus", ) -> None: """ Sets the status of a specific invocation. :param DistributedInvocation[Params, Result] invocation: The invocation to update. :param InvocationStatus status: The new status to set for the invocation. """ if status == InvocationStatus.PENDING: self._set_invocation_pending_status(invocation) else: # TODO async store of status in state backend if status.is_final(): self.release_waiters(invocation) self.clean_up_invocation_cycles(invocation) self.set_up_invocation_auto_purge(invocation) # TODO! on previous fail, this should still change status to running self._set_invocation_status(invocation, status) self.app.state_backend.add_history(invocation, status) self.app.trigger.report_tasks_status([invocation], status)
[docs] def set_invocations_status( self, invocations: list["DistributedInvocation[Params, Result]"], status: "InvocationStatus", ) -> None: """ Sets the status for a list of invocations. :param list[DistributedInvocation[Params, Result]] invocations: The invocations to update. :param InvocationStatus status: The new status to set for the invocations. """ self._set_invocations_status(invocations, status) self.app.trigger.report_tasks_status(invocations, status)
[docs] def set_invocation_run( self, caller: Optional["DistributedInvocation[Params, Result]"], callee: "DistributedInvocation[Params, Result]", ) -> None: """ Marks an invocation as running and checks for call cycles if a caller is specified. :param Optional[DistributedInvocation[Params, Result]] caller: The calling invocation, if any. :param DistributedInvocation[Params, Result] callee: The invocation that is being marked as running. """ if caller: self.add_call_and_check_cycles(caller, callee) # TODO! on previous fail, this should still change status self.set_invocation_status(callee, InvocationStatus.RUNNING)
[docs] def set_invocation_result( self, invocation: "DistributedInvocation", result: Any ) -> None: """ Sets the result for a completed invocation. :param DistributedInvocation invocation: The invocation that has completed. :param Any result: The result of the invocation's execution. """ self.app.state_backend.set_result(invocation, result) self.app.orchestrator.set_invocation_status( invocation, InvocationStatus.SUCCESS ) self.app.trigger.report_invocation_result(invocation, result)
[docs] def set_invocation_exception( self, invocation: "DistributedInvocation", exception: Exception ) -> None: """ Sets an exception for an invocation that finished with an error. :param DistributedInvocation invocation: The invocation that encountered an exception. :param Exception exception: The exception that occurred during the invocation's execution. """ self.app.state_backend.set_exception(invocation, exception) # TODO! on previous fail, this should still change status # eg. on case of interruption from a kubernetes pod (SIGTERM, SIGKILL) # it should try to finish all the calls in this function self.app.orchestrator.set_invocation_status(invocation, InvocationStatus.FAILED) self.app.trigger.report_invocation_failure(invocation, exception)
[docs] def set_invocation_retry( self, invocation: "DistributedInvocation", exception: Exception ) -> None: """ Sets an invocation for retry in case of a retriable exception. :param DistributedInvocation invocation: The invocation to be retried. :param Exception exception: The exception that triggered the retry. """ # TODO! on previous fail, this should still change status # eg. on case of interruption from a kubernetes pod (SIGTERM, SIGKILL) # it should try to finish all the calls in this function self.app.orchestrator.set_invocation_status(invocation, InvocationStatus.RETRY) self.app.orchestrator.increment_invocation_retries(invocation) self.app.broker.route_invocation(invocation)
[docs] def is_candidate_to_run_by_concurrency_control( self, invocation: "DistributedInvocation" ) -> bool: """ Checks if an invocation can be candidate to run based on the task's concurrency control configuration. :param DistributedInvocation invocation: The invocation to check for authorization. :return: True if the invocation is authorized to be a running candidate, False otherwise. """ return self._is_authorize_by_concurrency_control( invocation, [InvocationStatus.PENDING, InvocationStatus.RUNNING] )
[docs] def is_authorize_to_run_by_concurrency_control( self, invocation: "DistributedInvocation" ) -> bool: """ Checks if an invocation can be candidate to run based on the task's concurrency control configuration. :param DistributedInvocation invocation: The invocation to check for authorization. :return: True if the invocation is authorized to be a running candidate, False otherwise. """ return self._is_authorize_by_concurrency_control( invocation, [InvocationStatus.RUNNING] )
[docs] def _is_authorize_by_concurrency_control( self, invocation: "DistributedInvocation", statuses: list["InvocationStatus"] ) -> bool: """ Checks if an invocation is authorized to run based on the task's concurrency control configuration. ```{important} The authorization is determined by the task's running_concurrency setting: - If ConcurrencyControlType.DISABLED, the invocation is always authorized to run. - If ConcurrencyControlType.TASK, it checks if there are any other running invocations of the same task. If there are, the invocation is not authorized to run. - If ConcurrencyControlType.ARGUMENTS, it checks for any running invocation of the same task with the same arguments. If there are, the invocation is not authorized to run. - If ConcurrencyControlType.KEYS, it checks for any running invocation with the same (key) arguments. If any are found, the invocation is not authorized to run. ``` ```{note} The function call.serialized_args_for_concurrency_check is used to determine the arguments that are relevant for checking existing running invocations based on the task's running_concurrency option. ``` :param DistributedInvocation invocation: The invocation to check for authorization. :param list[InvocationStatus] statuses: The statuses to check for existing invocations. :return: True if the invocation is authorized, False otherwise. """ if invocation.task.conf.running_concurrency == ConcurrencyControlType.DISABLED: return True running_invocation = next( self.get_existing_invocations( task=invocation.call.task, key_serialized_arguments=invocation.call.serialized_args_for_concurrency_check, statuses=statuses, ), None, ) if not running_invocation: return True invocation.task.logger.debug( f"{invocation=} cannot run because {running_invocation} is already in {statuses} status" ) return False
[docs] def get_blocking_invocations_to_run( self, max_num_invocations: int, blocking_invocation_ids: set[str] ) -> Iterator["DistributedInvocation"]: """ Retrieves invocations that are blocking others but are not themselves blocked, up to a maximum number. :param int max_num_invocations: The maximum number of blocking invocations to retrieve. :param set[str] blocking_invocation_ids: A set of invocation IDs that are already identified as blocking. :return: An iterator over the blocking invocations. :rtype: Iterator[DistributedInvocation] """ for blocking_invocation in self.get_blocking_invocations(max_num_invocations): if not self.is_candidate_to_run_by_concurrency_control(blocking_invocation): continue blocking_invocation_ids.add(blocking_invocation.invocation_id) try: self._set_pending(blocking_invocation) yield blocking_invocation except PendingInvocationLockError: continue
[docs] def get_additional_invocations_to_run( self, missing_invocations: int, blocking_invocation_ids: set[str], invocations_to_reroute: set["DistributedInvocation"], ) -> Iterator["DistributedInvocation"]: """ Retrieves additional invocations to run, considering those not blocked or already identified as blocking. :param int missing_invocations: The number of additional invocations needed. :param set[str] blocking_invocation_ids: IDs of invocations already identified as blocking. :param set[DistributedInvocation] invocations_to_reroute: A set to collect invocations that need rerouting. :return: An iterator over the additional invocations to run. :rtype: Iterator[DistributedInvocation] """ while missing_invocations > 0: if invocation := self.app.broker.retrieve_invocation(): if invocation.invocation_id not in blocking_invocation_ids: invocation_status = self.get_invocation_status(invocation) if invocation_status.is_available_for_run(): if not self.is_candidate_to_run_by_concurrency_control( invocation ): invocations_to_reroute.add(invocation) continue try: self._set_pending(invocation) missing_invocations -= 1 yield invocation except PendingInvocationLockError: # invocations_to_reroute.add(invocation) continue else: break
[docs] def reroute_invocations( self, invocations_to_reroute: set["DistributedInvocation"] ) -> None: """ Reroutes the specified invocations, typically when they are not authorized to run. :param set[DistributedInvocation] invocations_to_reroute: The invocations to be rerouted. """ for invocation in invocations_to_reroute: invocation.task.logger.debug( f"Rerouting invocation {invocation.invocation_id} because it was not authorized to run" ) self.set_invocation_status(invocation, InvocationStatus.REROUTED) self.app.broker.route_invocation(invocation)
[docs] def get_invocations_to_run( self, max_num_invocations: int ) -> Iterator["DistributedInvocation"]: """ Retrieves a set of invocations to run, considering blocking and concurrency control. :param int max_num_invocations: The maximum number of invocations to retrieve. :return: An iterator over invocations that are ready to run. :rtype: Iterator[DistributedInvocation] """ blocking_invocation_ids: set[str] = set() yield from self.get_blocking_invocations_to_run( max_num_invocations, blocking_invocation_ids ) invocations_to_reroute: set[DistributedInvocation] = set() missing_invocations = max_num_invocations - len(blocking_invocation_ids) yield from self.get_additional_invocations_to_run( missing_invocations, blocking_invocation_ids, invocations_to_reroute ) self.reroute_invocations(invocations_to_reroute)
[docs] def _route_new_call_invocation( self, call: "Call[Params, Result]" ) -> "DistributedInvocation[Params, Result]": """ Routes a new call invocation within the distributed task system. This method creates and routes a new `DistributedInvocation` for the provided call. It is primarily used when the task does not have single invocation options set. :param Call[Params, Result] call: The task call to be routed. :return: The newly created `DistributedInvocation` for the call. :rtype: DistributedInvocation[Params, Result] """ parent_invocation = context.get_dist_invocation_context(self.app.app_id) new_invocation = DistributedInvocation(call, parent_invocation) self.app.logger.info(f"routing {new_invocation=}") self.set_invocation_status(new_invocation, InvocationStatus.REGISTERED) self.app.broker.route_invocation(new_invocation) self.app.logger.info( f"routed task {call.task.task_id} with invocation {new_invocation.invocation_id}" ) return new_invocation
[docs] def route_call(self, call: "Call") -> "DistributedInvocation[Params, Result]": """ Routes a task call in the distributed task system, considering single invocation options. This method handles the routing of a task call. ```{important} Note the different behavior depending on the task's registration_concurrency option. - If ConcurrencyControlType.DISABLED, It always creates a new invocation. - If ConcurrencyControlType.TASK, It checks for any existing invocation of the same task regardless the arguments. If any is found, it reuses it, otherwise it creates a new invocation. - If ConcurrencyControlType.ARGUMENTS, It checks for any existing invocation with the same arguments. If any is found, it reuses it, otherwise it creates a new invocation. - If ConcurrencyControlType.KEYS, It checks for any existing invocation with the same key arguments. If any is found and the non-key arguments are the same, it always reuses it, IF the non-key arguments are differents and on_diff_non_key_args_raise is set to True, it raises an error, otherwise it reuses it with the new non-key arguments. If no invocation is found, it creates a new invocation. ``` ```{note} The function call.serialized_args_for_concurrency_check is used to get the arguments that are used to check for existing invocations based on the task's registration_concurrency option. ``` :param Call call: The task call to be routed. :return: A `DistributedInvocation` object, which could be either a new or reused invocation. :rtype: DistributedInvocation[Params, Result] :raises InvocationConcurrencyWithDifferentArgumentsError: If an invocation with different arguments exists and the task's configuration specifies to raise an error in such cases. """ if call.task.conf.registration_concurrency == ConcurrencyControlType.DISABLED: call.task.logger.debug(f"New invocation for call {call}") return self._route_new_call_invocation(call) # Handleling registred task concurrency control invocation = next( self.get_existing_invocations( task=call.task, key_serialized_arguments=call.serialized_args_for_concurrency_check, statuses=[InvocationStatus.REGISTERED], ), None, ) if not invocation: call.task.logger.debug( f"No matching registered invocation found for {call.task} " f"and key args {call.serialized_args_for_concurrency_check}" ) return self._route_new_call_invocation(call) if invocation.serialized_arguments == call.serialized_arguments: call.task.logger.debug( f"Reusing invocation {invocation.invocation_id} for call {call}" ) return ReusedInvocation.from_existing(invocation) if call.task.conf.on_diff_non_key_args_raise: raise InvocationConcurrencyWithDifferentArgumentsError.from_call_mismatch( existing_invocation=invocation, new_call=call ) return ReusedInvocation.from_existing(invocation, call.arguments)
[docs] def route_calls( self, calls: list["PreSerializedCall[Params, Result]"] ) -> list["DistributedInvocation[Params, Result]"]: """ Routes multiple calls at once for improved performance. This method is specifically for batch processing tasks with disabled concurrency control. :param list[Call] calls: The calls to be routed. :return: The list of routed invocations. :raises BatchProcessingError: If concurrency control is enabled for any of the calls. """ if not calls: self.app.logger.debug("No calls to route") return [] if ( calls[0].task.conf.registration_concurrency != ConcurrencyControlType.DISABLED ): raise TaskParallelProcessingError( calls[0].task.task_id, "Batch processing is only supported with ConcurrencyControlType.DISABLED", ) parent_invocation = context.get_dist_invocation_context(self.app.app_id) invocations: list[DistributedInvocation[Params, Result]] = [ DistributedInvocation(call, parent_invocation=parent_invocation) # type: ignore for call in calls ] self._set_invocations_status(invocations, InvocationStatus.REGISTERED) self.app.broker.route_invocations(invocations) return invocations