pynenc.orchestrator.mem_orchestrator#

Module Contents#

Classes#

MemCycleControl

An implementation of cycle control using a directed acyclic graph (DAG) to represent call dependencies.

MemBlockingControl

An implementation of blocking control using a directed acyclic graph (DAG) to represent invocation dependencies.

ArgPair

Helper to simulate a Memory cache for key:value pairs in Task Invocations

TaskInvocationCache

A cache for storing and managing task invocations and their statuses.

MemOrchestrator

A memory-based implementation of the Orchestrator, managing task invocations and their lifecycle.

API#

class pynenc.orchestrator.mem_orchestrator.MemCycleControl(app: pynenc.app.Pynenc)[source]#

Bases: pynenc.orchestrator.base_orchestrator.BaseCycleControl

An implementation of cycle control using a directed acyclic graph (DAG) to represent call dependencies.

This class manages dependencies between task invocations to prevent call cycles, which could lead to deadlocks or infinite loops.

Parameters:

app (Pynenc) – The Pynenc application instance.

Initialization

add_call_and_check_cycles(caller: pynenc.invocation.dist_invocation.DistributedInvocation, callee: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#

Adds a new invocation to the graph, representing a dependency where the caller is dependent on the callee.

Raises a CycleDetectedError if adding the invocation would cause a cycle in the call graph.

Parameters:
Raises:

CycleDetectedError – If adding the invocation causes a cycle.

clean_up_invocation_cycles(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#

Removes an invocation from the graph, along with any edges to or from the invocation.

Parameters:

invocation (DistributedInvocation) – The invocation to be removed from the graph.

find_cycle_caused_by_new_invocation(caller: pynenc.invocation.dist_invocation.DistributedInvocation, callee: pynenc.invocation.dist_invocation.DistributedInvocation) list[pynenc.call.Call][source]#

Determines if adding a new edge from the caller to the callee would create a cycle in the graph.

Parameters:
Returns:

A list of Calls that would form a cycle after adding the new invocation, else an empty list.

Return type:

list[Call]

_is_cyclic_util(current_call_id: str, visited: set[str], path: list[str]) list[pynenc.call.Call][source]#

Utility function for cycle detection in the graph.

Parameters:
  • current_call_id (str) – The current call ID being checked for cycles.

  • visited (set[str]) – Set of already visited call IDs.

  • path (list[str]) – Current path of call IDs being traversed.

Returns:

A list of Calls that form a cycle, if one is detected.

Return type:

list[Call]

class pynenc.orchestrator.mem_orchestrator.MemBlockingControl(app: pynenc.app.Pynenc)[source]#

Bases: pynenc.orchestrator.base_orchestrator.BaseBlockingControl

An implementation of blocking control using a directed acyclic graph (DAG) to represent invocation dependencies.

This class manages dependencies between task invocations, ensuring that invocations waiting for others are properly handled.

Parameters:

app (Pynenc) – The Pynenc application instance.

Initialization

waiting_for_results(caller_invocation: DistributedInvocation[Params, Result], result_invocations: list[DistributedInvocation[Params, Result]]) None[source]#

Registers that an invocation (waiter) is waiting for the results of other invocations (waited).

Parameters:
release_waiters(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#

Removes an invocation from the graph, along with any dependencies related to it.

Parameters:

invocation (DistributedInvocation) – The invocation that has finished and will no longer block other invocations.

get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation[Params, Result]][source]#

Retrieves invocations that are blocking others but are not themselves waiting for any results.

Parameters:

max_num_invocations (int) – The maximum number of blocking invocations to retrieve.

Returns:

An iterator over invocations that are blocking others (older firsts).

Return type:

Iterator[DistributedInvocation[Params, Result]]

class pynenc.orchestrator.mem_orchestrator.ArgPair(key: str, value: Any)[source]#

Helper to simulate a Memory cache for key:value pairs in Task Invocations

Initialization

__hash__() int[source]#
__eq__(other: Any) bool[source]#
__str__() str[source]#
__repr__() str[source]#
class pynenc.orchestrator.mem_orchestrator.TaskInvocationCache(app: pynenc.app.Pynenc)[source]#

Bases: typing.Generic[pynenc.types.Result]

A cache for storing and managing task invocations and their statuses.

This class provides functionalities to track task invocations, including their arguments, statuses, retries, and auto-purge mechanisms.

Parameters:

app (Pynenc) – The Pynenc application instance.

Initialization

filter_by_key_arguments(key_arguments: dict[str, str]) set[str][source]#
filter_by_statuses(statuses: list[pynenc.invocation.status.InvocationStatus]) set[str][source]#
get_invocations(key_arguments: dict[str, str] | None, statuses: list[pynenc.invocation.status.InvocationStatus] | None) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]#

Retrieves invocations based on provided key arguments and/or status.

Parameters:
  • key_arguments (dict[str, str] | None) – The key arguments to filter the invocations.

  • status (list[InvocationStatus] | None) – The statuses to filter the invocations.

Returns:

An iterator over the filtered invocations.

Return type:

Iterator[DistributedInvocation]

set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None[source]#

Sets up an invocation for automatic purging after a specified time.

Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation to be set up for auto-purge.

auto_purge() None[source]#

Automatically purges invocations that have been in a final state for longer than a specified duration.

clean_up_invocation(invocation_id: str) None[source]#

Cleans up an invocation from the cache.

Parameters:

invocation_id (str) – The ID of the invocation to be cleaned up.

set_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]#

Sets the status of a specific invocation.

Parameters:
clean_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]#

Cleans the pending status of an invocation if it has exceeded the maximum pending time.

Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation whose pending status is to be cleaned.

set_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]#

Sets the status of an invocation to pending, handling any potential locking issues.

Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation to set to pending status.

Raises:

PendingInvocationLockError – If the invocation is already in pending status or cannot acquire a lock.

get_status(invocation: DistributedInvocation[Params, Result]) pynenc.invocation.status.InvocationStatus[source]#

Retrieves the current status of an invocation, accounting for pending timeout.

Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation to get the status for.

Returns:

The current status of the invocation.

Return type:

InvocationStatus

increase_retries(invocation: DistributedInvocation[Params, Result]) None[source]#

Increases the retry count for a given invocation.

Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation for which the retry count is to be increased.

get_retries(invocation: DistributedInvocation[Params, Result]) int[source]#

Retrieves the current number of retries for a given invocation.

Parameters:

invocation (DistributedInvocation[Params, Result]) – The invocation to get the retry count for.

Returns:

The number of retries for the invocation.

Return type:

int

class pynenc.orchestrator.mem_orchestrator.MemOrchestrator(app: pynenc.app.Pynenc)[source]#

Bases: pynenc.orchestrator.base_orchestrator.BaseOrchestrator

A memory-based implementation of the Orchestrator, managing task invocations and their lifecycle.

This class provides an in-memory solution for orchestrating task invocations, including cycle and blocking controls, as well as caching of invocation statuses and retries.

Warning

This orchestrator is not intended for production use.
As it stores all invocations in the running process memory.
Parameters:

app (Pynenc) – The Pynenc application instance.

Initialization

property cycle_control: pynenc.orchestrator.mem_orchestrator.MemCycleControl#
property blocking_control: pynenc.orchestrator.mem_orchestrator.MemBlockingControl#
get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]#
_set_invocation_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]#
_set_invocation_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]#
set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None[source]#
auto_purge() None[source]#
get_invocation_status(invocation: DistributedInvocation[Params, Result]) pynenc.invocation.status.InvocationStatus[source]#
increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None[source]#
get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int[source]#
purge() None[source]#