pynenc.orchestrator.base_orchestrator¶
Module Contents¶
Classes¶
A component of the orchestrator to implement cycle control functionalities. |
|
A component of the orchestrator to implement blocking control functionalities. |
|
Abstract base class defining the orchestrator’s interface in a distributed task system. |
API¶
- class pynenc.orchestrator.base_orchestrator.BaseCycleControl[source]¶
Bases:
abc.ABCA component of the orchestrator to implement cycle control functionalities.
This abstract base class defines the interface for cycle control in a distributed task system. It is intended to prevent the formation of call cycles between tasks.
- abstract add_call_and_check_cycles(caller_invocation: DistributedInvocation[Params, Result], callee_invocation: DistributedInvocation[Params, Result]) None[source]¶
Adds a new call relationship between invocations and checks for potential cycles.
- Parameters:
caller_invocation (DistributedInvocation[Params, Result]) – The invocation calling another task.
callee_invocation (DistributedInvocation[Params, Result]) – The invocation being called.
- Raises:
CycleDetectedError – If adding the call creates a cycle.
- abstract clean_up_invocation_cycles(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Cleans up any cycle-related data when an invocation is finished.
- Parameters:
invocation (DistributedInvocation) – The invocation that has finished.
- class pynenc.orchestrator.base_orchestrator.BaseBlockingControl[source]¶
Bases:
abc.ABCA component of the orchestrator to implement blocking control functionalities.
This abstract base class defines the interface for managing blocking behavior in distributed task executions.
- abstract release_waiters(waited: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Releases any invocations that are waiting on the specified invocation.
- Parameters:
waited (DistributedInvocation) – The invocation that has finished and can release its waiters.
- abstract waiting_for_results(caller_invocation: DistributedInvocation[Params, Result], result_invocations: list[DistributedInvocation[Params, Result]]) None[source]¶
Notifies the system that an invocation is waiting for the results of other invocations.
- Parameters:
caller_invocation (DistributedInvocation[Params, Result]) – The invocation that is waiting.
result_invocations (list[DistributedInvocation[Params, Result]]) – The invocations being waited on.
- abstract get_blocking_invocations(max_num_invocations: int) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves invocations that are blocking others but are not blocked themselves.
- Parameters:
max_num_invocations (int) – The maximum number of blocking invocations to retrieve.
- Returns:
An iterator over unblocked, blocking invocations, ordered by age (oldest first).
- Return type:
Iterator[DistributedInvocation]
- class pynenc.orchestrator.base_orchestrator.BaseOrchestrator(app: pynenc.app.Pynenc)[source]¶
Bases:
abc.ABCAbstract base class defining the orchestrator’s interface in a distributed task system.
The orchestrator is responsible for managing task invocations, including tracking their status, handling retries, and implementing cycle and blocking controls.
- Parameters:
app (Pynenc) – The Pynenc application instance.
Initialization
- abstract get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: Optional[dict[str, str]] = None, statuses: Optional[list[pynenc.invocation.status.InvocationStatus]] = None) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves existing invocations based on task, arguments, and status.
- Parameters:
- Returns:
An iterator over the matching invocations.
- Return type:
Iterator[DistributedInvocation]
- abstract _set_invocation_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]¶
Sets the status of a specific invocation.
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation to update.
status (InvocationStatus) – The new status to set for the invocation.
- abstract _set_invocation_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]¶
Sets the status of an invocation to pending.
Note
Pending can only be set by the orchestrator
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation to update.
- _set_pending(invocation: DistributedInvocation[Params, Result]) None[source]¶
Marks an invocation as pending and updates its history in the state backend.
Note
Pending can only be set by the orchestrator
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation to mark as pending.
- abstract set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None[source]¶
Sets up automatic purging for an invocation after a defined period.
Note
Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours`
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation to set up for auto purge.
- abstract auto_purge() None[source]¶
Automatically purges all invocations in a final state that are older than a defined time period.
Note
Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours`
- abstract get_invocation_status(invocation: DistributedInvocation[Params, Result]) pynenc.invocation.status.InvocationStatus[source]¶
Retrieves the status of a specific invocation.
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation whose status is to be retrieved.
- Returns:
The current status of the invocation.
- Return type:
- abstract increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None[source]¶
Increments the retry count of a specific invocation.
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation for which to increment retries.
- abstract get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int[source]¶
Retrieves the number of retries for a specific invocation.
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation whose retry count is to be retrieved.
- Returns:
The number of retries for the invocation.
- Return type:
- abstract purge() None[source]¶
Purges all the orchestrator data for the current self.app.app_id.
Important
This should only be used for testing purposes.
- abstract property cycle_control: pynenc.orchestrator.base_orchestrator.BaseCycleControl¶
Property to access the cycle control component of the orchestrator.
- Returns:
The cycle control component.
- Return type:
- add_call_and_check_cycles(caller_invocation: DistributedInvocation[Params, Result], callee_invocation: DistributedInvocation[Params, Result]) None[source]¶
Adds a call relationship between two invocations and checks for potential cycles.
- Parameters:
caller_invocation (DistributedInvocation[Params, Result]) – The calling invocation.
callee_invocation (DistributedInvocation[Params, Result]) – The called invocation.
- clean_up_invocation_cycles(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Cleans up data related to invocation cycles when an invocation finishes.
- Parameters:
invocation (DistributedInvocation) – The invocation that has finished.
- abstract property blocking_control: pynenc.orchestrator.base_orchestrator.BaseBlockingControl¶
Property to access the blocking control component of the orchestrator.
- Returns:
The blocking control component.
- Return type:
- release_waiters(waited: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Releases other invocations that are waiting on the completion of the specified invocation.
- Parameters:
waited (DistributedInvocation) – The invocation that has completed.
- waiting_for_results(caller_invocation: Optional[DistributedInvocation[Params, Result]], result_invocations: list[DistributedInvocation[Params, Result]]) None[source]¶
Notifies the system that an invocation is waiting on the results of other invocations.
- Parameters:
caller_invocation (Optional[DistributedInvocation[Params, Result]]) – The waiting invocation.
result_invocations (list[DistributedInvocation[Params, Result]]) – The invocations being waited on.
- get_blocking_invocations(max_num_invocations: int) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves invocations that are blocking others but not blocked themselves.
- Parameters:
max_num_invocations (int) – The maximum number of blocking invocations to retrieve.
- Returns:
An iterator over unblocked, blocking invocations.
- Return type:
Iterator[DistributedInvocation]
Note
The order of the returned invocations is **oldest first**.
- set_invocation_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]¶
Sets the status of a specific invocation.
- Parameters:
invocation (DistributedInvocation[Params, Result]) – The invocation to update.
status (InvocationStatus) – The new status to set for the invocation.
- set_invocations_status(invocations: list[DistributedInvocation[Params, Result]], status: pynenc.invocation.status.InvocationStatus) None[source]¶
Sets the status for a list of invocations.
- Parameters:
invocations (list[DistributedInvocation[Params, Result]]) – The invocations to update.
status (InvocationStatus) – The new status to set for the invocations.
- set_invocation_run(caller: Optional[DistributedInvocation[Params, Result]], callee: DistributedInvocation[Params, Result]) None[source]¶
Marks an invocation as running and checks for call cycles if a caller is specified.
- Parameters:
caller (Optional[DistributedInvocation[Params, Result]]) – The calling invocation, if any.
callee (DistributedInvocation[Params, Result]) – The invocation that is being marked as running.
- set_invocation_result(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, result: Any) None[source]¶
Sets the result for a completed invocation.
- Parameters:
invocation (DistributedInvocation) – The invocation that has completed.
result (Any) – The result of the invocation’s execution.
- set_invocation_exception(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, exception: Exception) None[source]¶
Sets an exception for an invocation that finished with an error.
- Parameters:
invocation (DistributedInvocation) – The invocation that encountered an exception.
exception (Exception) – The exception that occurred during the invocation’s execution.
- set_invocation_retry(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, exception: Exception) None[source]¶
Sets an invocation for retry in case of a retriable exception.
- Parameters:
invocation (DistributedInvocation) – The invocation to be retried.
exception (Exception) – The exception that triggered the retry.
- is_candidate_to_run_by_concurrency_control(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) bool[source]¶
Checks if an invocation can be candidate to run based on the task’s concurrency control configuration.
- Parameters:
invocation (DistributedInvocation) – The invocation to check for authorization.
- Returns:
True if the invocation is authorized to be a running candidate, False otherwise.
- is_authorize_to_run_by_concurrency_control(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) bool[source]¶
Checks if an invocation can be candidate to run based on the task’s concurrency control configuration.
- Parameters:
invocation (DistributedInvocation) – The invocation to check for authorization.
- Returns:
True if the invocation is authorized to be a running candidate, False otherwise.
- _is_authorize_by_concurrency_control(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, statuses: list[pynenc.invocation.status.InvocationStatus]) bool[source]¶
Checks if an invocation is authorized to run based on the task's concurrency control configuration. ```{important} The authorization is determined by the task's running_concurrency setting: - If ConcurrencyControlType.DISABLED, the invocation is always authorized to run. - If ConcurrencyControlType.TASK, it checks if there are any other running invocations of the same task. If there are, the invocation is not authorized to run. - If ConcurrencyControlType.ARGUMENTS, it checks for any running invocation of the same task with the same arguments. If there are, the invocation is not authorized to run. - If ConcurrencyControlType.KEYS, it checks for any running invocation with the same (key) arguments. If any are found, the invocation is not authorized to run. ``` ```{note} The function call.serialized_args_for_concurrency_check is used to determine the arguments that are relevant for checking existing running invocations based on the task's running_concurrency option. ``` :param DistributedInvocation invocation: The invocation to check for authorization. :param list[InvocationStatus] statuses: The statuses to check for existing invocations.- Returns:
True if the invocation is authorized, False otherwise.
- get_blocking_invocations_to_run(max_num_invocations: int, blocking_invocation_ids: set[str]) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves invocations that are blocking others but are not themselves blocked, up to a maximum number.
- Parameters:
- Returns:
An iterator over the blocking invocations.
- Return type:
Iterator[DistributedInvocation]
- get_additional_invocations_to_run(missing_invocations: int, blocking_invocation_ids: set[str], invocations_to_reroute: set[pynenc.invocation.dist_invocation.DistributedInvocation]) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves additional invocations to run, considering those not blocked or already identified as blocking.
- Parameters:
missing_invocations (int) – The number of additional invocations needed.
blocking_invocation_ids (set[str]) – IDs of invocations already identified as blocking.
invocations_to_reroute (set[DistributedInvocation]) – A set to collect invocations that need rerouting.
- Returns:
An iterator over the additional invocations to run.
- Return type:
Iterator[DistributedInvocation]
- reroute_invocations(invocations_to_reroute: set[pynenc.invocation.dist_invocation.DistributedInvocation]) None[source]¶
Reroutes the specified invocations, typically when they are not authorized to run.
- Parameters:
invocations_to_reroute (set[DistributedInvocation]) – The invocations to be rerouted.
- get_invocations_to_run(max_num_invocations: int) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves a set of invocations to run, considering blocking and concurrency control.
- Parameters:
max_num_invocations (int) – The maximum number of invocations to retrieve.
- Returns:
An iterator over invocations that are ready to run.
- Return type:
Iterator[DistributedInvocation]
- _route_new_call_invocation(call: Call[Params, Result]) DistributedInvocation[Params, Result][source]¶
Routes a new call invocation within the distributed task system.
This method creates and routes a new
DistributedInvocationfor the provided call. It is primarily used when the task does not have single invocation options set.- Parameters:
call (Call[Params, Result]) – The task call to be routed.
- Returns:
The newly created
DistributedInvocationfor the call.- Return type:
DistributedInvocation[Params, Result]
- route_call(call: pynenc.call.Call) DistributedInvocation[Params, Result][source]¶
Routes a task call in the distributed task system, considering single invocation options.
This method handles the routing of a task call.
Important
Note the different behavior depending on the task's registration_concurrency option. - If ConcurrencyControlType.DISABLED, It always creates a new invocation. - If ConcurrencyControlType.TASK, It checks for any existing invocation of the same task regardless the arguments. If any is found, it reuses it, otherwise it creates a new invocation. - If ConcurrencyControlType.ARGUMENTS, It checks for any existing invocation with the same arguments. If any is found, it reuses it, otherwise it creates a new invocation. - If ConcurrencyControlType.KEYS, It checks for any existing invocation with the same key arguments. If any is found and the non-key arguments are the same, it always reuses it, IF the non-key arguments are differents and on_diff_non_key_args_raise is set to True, it raises an error, otherwise it reuses it with the new non-key arguments. If no invocation is found, it creates a new invocation.Note
The function call.serialized_args_for_concurrency_check is used to get the arguments that are used to check for existing invocations based on the task's registration_concurrency option.
- Parameters:
call (Call) – The task call to be routed.
- Returns:
A
DistributedInvocationobject, which could be either a new or reused invocation.- Return type:
DistributedInvocation[Params, Result]
- Raises:
InvocationConcurrencyWithDifferentArgumentsError – If an invocation with different arguments exists and the task’s configuration specifies to raise an error in such cases.