pynenc.orchestrator.base_orchestrator

Module Contents

Classes

BaseCycleControl

A component of the orchestrator to implement cycle control functionalities.

BaseBlockingControl

A component of the orchestrator to implement blocking control functionalities.

BaseOrchestrator

Abstract base class defining the orchestrator’s interface in a distributed task system.

API

class pynenc.orchestrator.base_orchestrator.BaseCycleControl[source]

Bases: abc.ABC

A component of the orchestrator to implement cycle control functionalities.

This abstract base class defines the interface for cycle control in a distributed task system. It is intended to prevent the formation of call cycles between tasks.

abstract add_call_and_check_cycles(caller_invocation: DistributedInvocation[Params, Result], callee_invocation: DistributedInvocation[Params, Result]) None[source]

Adds a new call relationship between invocations and checks for potential cycles.

Parameters:
Raises:

CycleDetectedError – If adding the call creates a cycle.

abstract clean_up_invocation_cycles(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Cleans up any cycle-related data when an invocation is finished.

Parameters:

invocation (DistributedInvocation) – The invocation that has finished.

class pynenc.orchestrator.base_orchestrator.BaseBlockingControl[source]

Bases: abc.ABC

A component of the orchestrator to implement blocking control functionalities.

This abstract base class defines the interface for managing blocking behavior in distributed task executions.

abstract release_waiters(waited: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Releases any invocations that are waiting on the specified invocation.

Parameters:

waited (DistributedInvocation) – The invocation that has finished and can release its waiters.

abstract waiting_for_results(caller_invocation: DistributedInvocation[Params, Result], result_invocations: list[DistributedInvocation[Params, Result]]) None[source]

Notifies the system that an invocation is waiting for the results of other invocations.

Parameters:
abstract get_blocking_invocations(max_num_invocations: int) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Retrieves invocations that are blocking others but are not blocked themselves.

Parameters:

max_num_invocations (int) – The maximum number of blocking invocations to retrieve.

Returns:

An iterator over unblocked, blocking invocations, ordered by age (oldest first).

Return type:

Iterator[DistributedInvocation]

class pynenc.orchestrator.base_orchestrator.BaseOrchestrator(app: pynenc.app.Pynenc)[source]

Bases: abc.ABC

Abstract base class defining the orchestrator’s interface in a distributed task system.

The orchestrator is responsible for managing task invocations, including tracking their status, handling retries, and implementing cycle and blocking controls.

Parameters:

app (Pynenc) – The Pynenc application instance.

Initialization

conf() pynenc.conf.config_orchestrator.ConfigOrchestrator
abstract get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: Optional[dict[str, str]] = None, statuses: Optional[list[pynenc.invocation.status.InvocationStatus]] = None) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Retrieves existing invocations based on task, arguments, and status.

Parameters:
  • task (Task[Params, Result]) – The task for which to retrieve invocations.

  • key_serialized_arguments (Optional[dict[str, str]]) – Serialized arguments to filter invocations.

  • status (Optional[list[InvocationStatus]]) – The status to filter invocations.

Returns:

An iterator over the matching invocations.

Return type:

Iterator[DistributedInvocation]

abstract get_invocation(invocation_id: str) Optional[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Retrieves a specific invocation by its ID.

This method provides a direct way to access an invocation without filtering through all invocations, which can be much more efficient when the invocation ID is known.

Parameters:

invocation_id (str) – The ID of the invocation to retrieve.

Returns:

The invocation if found, None otherwise.

Return type:

Optional[DistributedInvocation]

abstract _set_invocation_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]

Sets the status of a specific invocation.

Parameters:
abstract _set_invocations_status(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation], status: pynenc.invocation.status.InvocationStatus) None[source]

Set the status of multiple invocations at once.

Default implementation sets status for each invocation sequentially. Subclasses should override this with more efficient batch implementations.

Parameters:
abstract _set_invocation_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]

Sets the status of an invocation to pending.

Note

Pending can only be set by the orchestrator
Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation to update.

_set_pending(invocation: DistributedInvocation[Params, Result]) None[source]

Marks an invocation as pending and updates its history in the state backend.

Note

Pending can only be set by the orchestrator
Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation to mark as pending.

abstract set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None[source]

Sets up automatic purging for an invocation after a defined period.

Note

Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours`
Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation to set up for auto purge.

abstract auto_purge() None[source]

Automatically purges all invocations in a final state that are older than a defined time period.

Note

Set auto purge period with `app.conf.orchestrator_auto_final_invocation_purge_hours`
abstract get_invocation_status(invocation: DistributedInvocation[Params, Result]) pynenc.invocation.status.InvocationStatus[source]

Retrieves the status of a specific invocation.

Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation whose status is to be retrieved.

Returns:

The current status of the invocation.

Return type:

InvocationStatus

abstract increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None[source]

Increments the retry count of a specific invocation.

Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation for which to increment retries.

abstract get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int[source]

Retrieves the number of retries for a specific invocation.

Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation whose retry count is to be retrieved.

Returns:

The number of retries for the invocation.

Return type:

int

abstract filter_by_status(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation], status_filter: set[pynenc.invocation.status.InvocationStatus] | None = None) list[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Filters a list of invocations by their status in an optimized way.

This method allows efficient batch filtering of invocations by status, reducing the number of individual status checks needed.

Parameters:
Returns:

List of invocations matching the status filter

Return type:

list[DistributedInvocation]

filter_final(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation]) list[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Returns invocations that have reached a final status.

This is a convenience method that internally uses filter_by_status with all final statuses.

Parameters:

invocations (list[DistributedInvocation]) – The invocations to check

Returns:

List of invocations that have reached a final status

Return type:

list[DistributedInvocation]

abstract purge() None[source]

Purges all the orchestrator data for the current self.app.app_id.

Important

This should only be used for testing purposes.
abstract property cycle_control: pynenc.orchestrator.base_orchestrator.BaseCycleControl

Property to access the cycle control component of the orchestrator.

Returns:

The cycle control component.

Return type:

BaseCycleControl

add_call_and_check_cycles(caller_invocation: DistributedInvocation[Params, Result], callee_invocation: DistributedInvocation[Params, Result]) None[source]

Adds a call relationship between two invocations and checks for potential cycles.

Parameters:
clean_up_invocation_cycles(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Cleans up data related to invocation cycles when an invocation finishes.

Parameters:

invocation (DistributedInvocation) – The invocation that has finished.

abstract property blocking_control: pynenc.orchestrator.base_orchestrator.BaseBlockingControl

Property to access the blocking control component of the orchestrator.

Returns:

The blocking control component.

Return type:

BaseBlockingControl

release_waiters(waited: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Releases other invocations that are waiting on the completion of the specified invocation.

Parameters:

waited (DistributedInvocation) – The invocation that has completed.

waiting_for_results(caller_invocation: Optional[DistributedInvocation[Params, Result]], result_invocations: list[DistributedInvocation[Params, Result]]) None[source]

Notifies the system that an invocation is waiting on the results of other invocations.

Parameters:
get_blocking_invocations(max_num_invocations: int) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Retrieves invocations that are blocking others but not blocked themselves.

Parameters:

max_num_invocations (int) – The maximum number of blocking invocations to retrieve.

Returns:

An iterator over unblocked, blocking invocations.

Return type:

Iterator[DistributedInvocation]

Note

The order of the returned invocations is **oldest first**.
set_invocation_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]

Sets the status of a specific invocation.

Parameters:
set_invocations_status(invocations: list[DistributedInvocation[Params, Result]], status: pynenc.invocation.status.InvocationStatus) None[source]

Sets the status for a list of invocations.

Parameters:
set_invocation_run(caller: Optional[DistributedInvocation[Params, Result]], callee: DistributedInvocation[Params, Result]) None[source]

Marks an invocation as running and checks for call cycles if a caller is specified.

Parameters:
set_invocation_result(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, result: Any) None[source]

Sets the result for a completed invocation.

Parameters:
  • invocation (DistributedInvocation) – The invocation that has completed.

  • result (Any) – The result of the invocation’s execution.

set_invocation_exception(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, exception: Exception) None[source]

Sets an exception for an invocation that finished with an error.

Parameters:
  • invocation (DistributedInvocation) – The invocation that encountered an exception.

  • exception (Exception) – The exception that occurred during the invocation’s execution.

set_invocation_retry(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, exception: Exception) None[source]

Sets an invocation for retry in case of a retriable exception.

Parameters:
is_candidate_to_run_by_concurrency_control(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) bool[source]

Checks if an invocation can be candidate to run based on the task’s concurrency control configuration.

Parameters:

invocation (DistributedInvocation) – The invocation to check for authorization.

Returns:

True if the invocation is authorized to be a running candidate, False otherwise.

is_authorize_to_run_by_concurrency_control(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) bool[source]

Checks if an invocation can be candidate to run based on the task’s concurrency control configuration.

Parameters:

invocation (DistributedInvocation) – The invocation to check for authorization.

Returns:

True if the invocation is authorized to be a running candidate, False otherwise.

_is_authorize_by_concurrency_control(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, statuses: list[pynenc.invocation.status.InvocationStatus]) bool[source]
    Checks if an invocation is authorized to run based on the task's concurrency control configuration.

    ```{important}
    The authorization is determined by the task's running_concurrency setting:
    - If ConcurrencyControlType.DISABLED, the invocation is always authorized to run.
    - If ConcurrencyControlType.TASK, it checks if there are any other running invocations of the same task.
    If there are, the invocation is not authorized to run.
    - If ConcurrencyControlType.ARGUMENTS, it checks for any running invocation of the same task with the same arguments.
    If there are, the invocation is not authorized to run.
    - If ConcurrencyControlType.KEYS, it checks for any running invocation with the same (key) arguments.
    If any are found, the invocation is not authorized to run.
    ```

    ```{note}
    The function call.serialized_args_for_concurrency_check is used to determine the arguments
    that are relevant for checking existing running invocations based on the task's running_concurrency option.
    ```

    :param DistributedInvocation invocation: The invocation to check for authorization.
    :param list[InvocationStatus] statuses: The statuses to check for existing invocations.
Returns:

True if the invocation is authorized, False otherwise.

get_blocking_invocations_to_run(max_num_invocations: int, blocking_invocation_ids: set[str]) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Retrieves invocations that are blocking others but are not themselves blocked, up to a maximum number.

Parameters:
  • max_num_invocations (int) – The maximum number of blocking invocations to retrieve.

  • blocking_invocation_ids (set[str]) – A set of invocation IDs that are already identified as blocking.

Returns:

An iterator over the blocking invocations.

Return type:

Iterator[DistributedInvocation]

get_additional_invocations_to_run(missing_invocations: int, blocking_invocation_ids: set[str], invocations_to_reroute: set[pynenc.invocation.dist_invocation.DistributedInvocation]) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Retrieves additional invocations to run, considering those not blocked or already identified as blocking.

Parameters:
  • missing_invocations (int) – The number of additional invocations needed.

  • blocking_invocation_ids (set[str]) – IDs of invocations already identified as blocking.

  • invocations_to_reroute (set[DistributedInvocation]) – A set to collect invocations that need rerouting.

Returns:

An iterator over the additional invocations to run.

Return type:

Iterator[DistributedInvocation]

reroute_invocations(invocations_to_reroute: set[pynenc.invocation.dist_invocation.DistributedInvocation]) None[source]

Reroutes the specified invocations, typically when they are not authorized to run.

Parameters:

invocations_to_reroute (set[DistributedInvocation]) – The invocations to be rerouted.

get_invocations_to_run(max_num_invocations: int) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Retrieves a set of invocations to run, considering blocking and concurrency control.

Parameters:

max_num_invocations (int) – The maximum number of invocations to retrieve.

Returns:

An iterator over invocations that are ready to run.

Return type:

Iterator[DistributedInvocation]

_route_new_call_invocation(call: Call[Params, Result]) DistributedInvocation[Params, Result][source]

Routes a new call invocation within the distributed task system.

This method creates and routes a new DistributedInvocation for the provided call. It is primarily used when the task does not have single invocation options set.

Parameters:

call (Call[Params, Result]) – The task call to be routed.

Returns:

The newly created DistributedInvocation for the call.

Return type:

DistributedInvocation[Params, Result]

route_call(call: pynenc.call.Call) DistributedInvocation[Params, Result][source]

Routes a task call in the distributed task system, considering single invocation options.

This method handles the routing of a task call.

Important

Note the different behavior depending on the task's registration_concurrency option.
- If ConcurrencyControlType.DISABLED,
    It always creates a new invocation.
- If ConcurrencyControlType.TASK,
    It checks for any existing invocation of the same task regardless the arguments.
    If any is found, it reuses it, otherwise it creates a new invocation.
- If ConcurrencyControlType.ARGUMENTS,
    It checks for any existing invocation with the same arguments.
    If any is found, it reuses it, otherwise it creates a new invocation.
- If ConcurrencyControlType.KEYS,
    It checks for any existing invocation with the same key arguments.
    If any is found and the non-key arguments are the same, it always reuses it,
    IF the non-key arguments are differents and on_diff_non_key_args_raise is set to True,
    it raises an error, otherwise it reuses it with the new non-key arguments.
    If no invocation is found, it creates a new invocation.

Note

The function call.serialized_args_for_concurrency_check is used to get the arguments
that are used to check for existing invocations based on the task's registration_concurrency option.
Parameters:

call (Call) – The task call to be routed.

Returns:

A DistributedInvocation object, which could be either a new or reused invocation.

Return type:

DistributedInvocation[Params, Result]

Raises:

InvocationConcurrencyWithDifferentArgumentsError – If an invocation with different arguments exists and the task’s configuration specifies to raise an error in such cases.

route_calls(calls: list[PreSerializedCall[Params, Result]]) list[DistributedInvocation[Params, Result]][source]

Routes multiple calls at once for improved performance.

This method is specifically for batch processing tasks with disabled concurrency control.

Parameters:

calls (list[Call]) – The calls to be routed.

Returns:

The list of routed invocations.

Raises:

BatchProcessingError – If concurrency control is enabled for any of the calls.