pynenc.orchestrator.mem_orchestrator

Module Contents

Classes

MemBlockingControl

An implementation of blocking control using a directed acyclic graph (DAG) to represent invocation dependencies.

ArgPair

Helper to simulate a Memory cache for key:value pairs in Task Invocations

MemOrchestrator

A memory-based implementation of the Orchestrator, managing task invocations and their lifecycle.

API

class pynenc.orchestrator.mem_orchestrator.MemBlockingControl(app: pynenc.app.Pynenc)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseBlockingControl

An implementation of blocking control using a directed acyclic graph (DAG) to represent invocation dependencies.

This class manages dependencies between task invocations, ensuring that invocations waiting for others are properly handled.

Parameters:

app (Pynenc) – The Pynenc application instance.

Initialization

waiting_for_results(caller_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]

Notifies the system that an invocation is waiting for the results of other invocations.

Parameters:
  • caller_invocation_id (InvocationId) – The ID of the invocation that is waiting.

  • result_invocation_ids (list[InvocationId]) – The IDs of the invocations being waited on.

release_waiters(waited_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Removes an invocation from the graph, along with any dependencies related to it.

Parameters:

waited_invocation_id (InvocationId) – The ID of the invocation that has finished and will no longer block other invocations.

get_blocking_invocations(max_num_invocations: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves invocations that are blocking others but are not themselves waiting for any results.

Parameters:

max_num_invocations (int) – The maximum number of blocking invocations to retrieve.

Returns:

An iterator over invocations that are blocking others (older firsts).

Return type:

Iterator[“InvocationId”]

class pynenc.orchestrator.mem_orchestrator.ArgPair(key: str, value: Any)[source]

Helper to simulate a Memory cache for key:value pairs in Task Invocations

Initialization

__hash__() int[source]

Generate a hash that works with serialized values

__eq__(other: Any) bool[source]

Equality check that works with serialized values

__str__() str[source]
__repr__() str[source]
class pynenc.orchestrator.mem_orchestrator.MemOrchestrator(app: pynenc.app.Pynenc)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseOrchestrator

A memory-based implementation of the Orchestrator, managing task invocations and their lifecycle.

This class provides an in-memory solution for orchestrating task invocations, including blocking controls, as well as caching of invocation statuses and retries.

Warning

This orchestrator is not intended for production use.
As it stores all invocations in the running process memory.
Parameters:

app (Pynenc) – The Pynenc application instance.

Initialization

property blocking_control: pynenc.orchestrator.mem_orchestrator.MemBlockingControl
_register_new_invocations(invocations: list[DistributedInvocation[Params, Result]], runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]

Registers new invocations and sets them to REGISTERED status.

filter_by_key_arguments(key_arguments: dict[str, str]) set[pynenc.identifiers.invocation_id.InvocationId][source]

Filters invocations by key arguments, requiring ALL keys to match.

Parameters:

key_arguments (dict[str, str]) – Key-value pairs to filter by

Returns:

Set of invocation IDs that match ALL provided key-value pairs

filter_by_statuses(statuses: list[pynenc.invocation.status.InvocationStatus]) set[pynenc.identifiers.invocation_id.InvocationId][source]
get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, statuses: list[InvocationStatus] | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves invocation ids based on provided key arguments and/or status.

Parameters:
  • key_arguments (dict[str, str] | None) – The key arguments to filter the invocations.

  • status (list[InvocationStatus] | None) – The statuses to filter the invocations.

Returns:

An iterator over the filtered invocations.

Return type:

Iterator[“InvocationId”]

get_task_invocation_ids(task_id: pynenc.task.TaskId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves all invocation ids for a given task id.

Parameters:

task_id (TaskId) – The task id to filter the invocations.

Returns:

An iterator over the invocation ids for the specified task.

Return type:

Iterator[“InvocationId”]

get_invocation_ids_paginated(task_id: Optional[pynenc.task.TaskId] = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None, limit: int = 100, offset: int = 0) list[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves invocation IDs with pagination support.

Parameters:
  • task_id (TaskId | None) – Optional task ID to filter by.

  • statuses (list[InvocationStatus] | None) – Optional statuses to filter by.

  • limit (int) – Maximum number of results to return.

  • offset (int) – Number of results to skip.

Returns:

List of matching invocation IDs.

count_invocations(task_id: Optional[pynenc.task.TaskId] = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) int[source]

Counts invocations matching the given filters.

Parameters:
  • task_id (str | None) – Optional task ID to filter by.

  • statuses (list[InvocationStatus] | None) – Optional statuses to filter by.

Returns:

The total count of matching invocations.

get_call_invocation_ids(call_id: pynenc.identifiers.call_id.CallId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves all invocation IDs associated with a specific call ID.

set_up_invocation_auto_purge(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Sets up an invocation for automatic purging after a specified time.

Parameters:

invocation_id (InvocationId) – The ID of the invocation to be set up for auto-purge.

auto_purge() None[source]

Automatically purges invocations that have been in a final state for longer than a specified duration.

clean_up_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Cleans up an invocation from the cache.

Parameters:

invocation_id (InvocationId) – The ID of the invocation to be cleaned up.

_get_invocation_lock(invocation_id: pynenc.identifiers.invocation_id.InvocationId) threading.Lock[source]

Get or create a per-invocation lock for atomic transitions.

Parameters:

invocation_id – The invocation to get the lock for.

Returns:

A threading Lock for the given invocation.

_atomic_status_transition(invocation_id: pynenc.identifiers.invocation_id.InvocationId, status: pynenc.invocation.status.InvocationStatus, runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]

Sets the status record of a specific invocation.

Uses per-invocation locking to prevent TOCTOU race conditions where multiple threads could read the same current status, both validate their transition, and both write — producing duplicate history entries.

_interanl_atomic_status_transition(invocation_id: pynenc.identifiers.invocation_id.InvocationId, prev_status_record: pynenc.invocation.status.InvocationStatusRecord | None, new_record: pynenc.invocation.status.InvocationStatusRecord) pynenc.invocation.status.InvocationStatusRecord[source]

Sets the status record of a specific invocation.

index_arguments_for_concurrency_control(invocation: DistributedInvocation[Params, Result]) None[source]
get_invocation_status_record(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.status.InvocationStatusRecord[source]

Retrieves the current status of an invocation

increment_invocation_retries(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Increases the retry count for a given invocation.

Parameters:

invocation_id (InvocationId) – The ID of the invocation for which the retry count is to be increased.

get_invocation_retries(invocation_id: pynenc.identifiers.invocation_id.InvocationId) int[source]

Retrieves the current number of retries for a given invocation.

Parameters:

invocation_id (InvocationId) – The ID of the invocation to get the retry count for.

Returns:

The number of retries for the invocation.

Return type:

int

filter_by_status(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], status_filter: frozenset[pynenc.invocation.status.InvocationStatus]) list[pynenc.identifiers.invocation_id.InvocationId][source]
register_runner_heartbeats(runner_ids: list[str], can_run_atomic_service: bool = False) None[source]

Register or update heartbeat timestamps for one or more runners.

_get_active_runners(timeout_seconds: float, can_run_atomic_service: bool | None = None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]

Retrieve all active runners with heartbeat information.

record_atomic_service_execution(runner_id: str, start_time: datetime.datetime, end_time: datetime.datetime) None[source]

Record the latest atomic service execution window for a runner.

get_pending_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs stuck in PENDING status beyond the allowed time.

_get_running_invocations_for_recovery(timeout_seconds: float) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve RUNNING invocation IDs owned by inactive runners.

purge() None[source]