pynenc.orchestrator.redis_orchestrator

Module Contents

Classes

RedisCycleControl

A Redis-based implementation of cycle control using a directed acyclic graph (DAG).

RedisBlockingControl

A Redis-based implementation of blocking control for task invocations.

TaskRedisCache

A Redis-based cache for managing task invocation statuses and retries.

RedisOrchestrator

An orchestrator implementation using Redis for managing distributed task invocations.

Functions

_clean_dead_threads

Remove completed futures from the registry to prevent memory leaks.

Data

API

pynenc.orchestrator.redis_orchestrator._pending_resolution_threads: dict[str, concurrent.futures.Future[None]]

None

pynenc.orchestrator.redis_orchestrator._registry_lock

‘Lock(…)’

pynenc.orchestrator.redis_orchestrator._clean_dead_threads() None[source]

Remove completed futures from the registry to prevent memory leaks.

exception pynenc.orchestrator.redis_orchestrator.StatusNotFound[source]

Bases: Exception

Raised when a status is not found in Redis

Initialization

Initialize self. See help(type(self)) for accurate signature.

class pynenc.orchestrator.redis_orchestrator.RedisCycleControl(app: pynenc.app.Pynenc, client: redis.Redis)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseCycleControl

A Redis-based implementation of cycle control using a directed acyclic graph (DAG).

This class manages the dependencies between task invocations in Redis to prevent cycles in task calling patterns, which could lead to deadlocks or infinite loops.

Parameters:
  • app (Pynenc) – The Pynenc application instance.

  • client (redis.Redis) – The Redis client instance.

Initialization

purge() None[source]

Purges all data related to cycle control from Redis. This includes all stored invocations, calls, and their relationships.

add_call_and_check_cycles(caller: pynenc.invocation.dist_invocation.DistributedInvocation, callee: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Adds a new call dependency between caller and callee invocations and checks for potential cycles.

Parameters:
Raises:

CycleDetectedError – If adding the call creates a cycle.

remove_edges(call_id: str) None[source]

Recursively removes all edges from a given call in the graph.

Parameters:

call_id – The ID of the call from which to remove edges.

clean_up_invocation_cycles(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Cleans up the graph by removing a given invocation and its associated edges.

Parameters:

invocation (DistributedInvocation) – The DistributedInvocation instance to be removed.

find_cycle_caused_by_new_invocation(caller: pynenc.invocation.dist_invocation.DistributedInvocation, callee: pynenc.invocation.dist_invocation.DistributedInvocation) list[pynenc.call.Call][source]

Checks if adding a new call from caller to callee would create a cycle.

Parameters:
Returns:

List of Call objects forming the cycle, if a cycle is detected; otherwise, an empty list.

_is_cyclic_util(current_call_id: str, visited: set[str], path: list[str]) list[pynenc.call.Call][source]

A utility function for cycle detection.

Parameters:
  • current_call_id (str) – The current call ID being examined.

  • visited (set[str]) – A set of visited call IDs for cycle detection.

  • path (list[str]) – A list representing the current path of call IDs.

Returns:

List of Call objects forming a cycle, if a cycle is detected; otherwise, an empty list.

class pynenc.orchestrator.redis_orchestrator.RedisBlockingControl(app: pynenc.app.Pynenc, client: redis.Redis)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseBlockingControl

A Redis-based implementation of blocking control for task invocations.

Manages invocation dependencies and blocking states in a Redis-backed environment, ensuring that invocations waiting for others are properly tracked and released.

Parameters:
  • app (Pynenc) – The Pynenc application instance.

  • client (redis.Redis) – The Redis client instance.

Initialization

purge() None[source]

Purges all data related to blocking control from Redis. This includes all stored invocations and their waiting relationships.

waiting_for_results(waiter: pynenc.invocation.dist_invocation.DistributedInvocation, waiteds: list[pynenc.invocation.dist_invocation.DistributedInvocation]) None[source]

Registers that an invocation (waiter) is waiting for the results of other invocations (waiteds).

Parameters:
release_waiters(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Removes an invocation from the tracking system. Also removes any dependencies related to the invocation.

Parameters:

invocation (DistributedInvocation) – The DistributedInvocation instance to be removed.

get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation[Params, Result]][source]

Returns the invocations that are blocking others but are not waiting for anything themselves.

Parameters:

max_num_invocations (int) – The maximum number of blocking invocations to retrieve.

Returns:

An iterator of blocking DistributedInvocation instances.

class pynenc.orchestrator.redis_orchestrator.TaskRedisCache(app: pynenc.app.Pynenc, client: redis.Redis)[source]

A Redis-based cache for managing task invocation statuses and retries.

Provides methods to set and get the status and retry counts of task invocations, leveraging Redis for scalable and efficient storage.

Parameters:
  • app (Pynenc) – The Pynenc application instance.

  • client (redis.Redis) – The Redis client instance.

Initialization

property conf: pynenc.conf.config_orchestrator.ConfigOrchestratorRedis
purge() None[source]

Purges all data related to task invocations from Redis.

_set_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus, previous_status: Optional[pynenc.invocation.status.InvocationStatus], pipeline: redis.client.Pipeline) None[source]

Inner method to set the status of a single invocation using a provided pipeline.

Parameters:
  • invocation (DistributedInvocation) – The invocation to update.

  • status (InvocationStatus) – The new status to set.

  • previous_status (Optional[InvocationStatus]) – The previous status, or None if it’s a new invocation.

  • pipeline (redis.client.Pipeline) – The Redis pipeline to use for commands.

set_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]

Set the status of a single invocation in Redis, optionally using a provided pipeline.

Parameters:
  • invocation (DistributedInvocation) – The invocation to update.

  • status (InvocationStatus) – The new status to set.

  • pipeline (Optional[redis.client.Pipeline]) – Optional Redis pipeline; if None, creates and executes one.

set_batch_status(invocations: list[DistributedInvocation[Params, Result]], status: pynenc.invocation.status.InvocationStatus) None[source]

Set the status of multiple invocations at once using a Redis pipeline.

Parameters:
set_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]

Set the status of an invocation to pending in Redis.

Danger

We have to ensure that the invocation is not already pending.
Otherwise, we could end up with a deadlock.
Parameters:

invocation (DistributedInvocation) – The invocation to be updated.

Raises:

PendingInvocationLockError – If was not possible to lock the Invocation.

set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None[source]

Sets up an invocation for automatic purging after a specified time.

Parameters:

invocation (DistributedInvocation) – The invocation to be updated.

auto_purge() None[source]

Automatically purges invocations that have been in their final state beyond a specified duration.

Note

The duration is specified in the configuration file using the `auto_final_invocation_purge_hours` parameter.
clean_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]

Cleans up the pending status of a task invocation.

Parameters:

invocation – The task invocation instance.

_get_invocation_status(invocation_id: str) pynenc.invocation.status.InvocationStatus[source]

Gets the status of a specific task invocation.

Parameters:

invocation_id – The ID of the task invocation.

Returns:

The current status of the task invocation.

get_invocation_status(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) pynenc.invocation.status.InvocationStatus[source]

Retrieves the status of a task invocation.

Parameters:

invocation – The task invocation instance.

Returns:

The current status of the task invocation.

_async_resolve_pending_status(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Asynchronously resolve and update an expired PENDING status.

This method is designed to run in a background thread.

Parameters:

invocation_id (str) – The ID of the invocation to update

_start_async_pending_resolution_for_invocation(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Start asynchronous resolution of a PENDING status.

Uses ThreadPoolExecutor to manage a pool of worker threads.

Parameters:

invocation – The invocation to check PENDING status for

get_invocation_retries(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) int[source]

Gets the number of retries for a specific task invocation.

Parameters:

invocation – The task invocation instance.

Returns:

The number of retries of the task invocation.

increment_invocation_retries(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Increments the retry count for a specific task invocation.

Parameters:

invocation – The task invocation instance.

get_invocations(task_id: str, key_arguments: Optional[dict[str, str]], statuses: Optional[list[pynenc.invocation.status.InvocationStatus]]) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Retrieves task invocations based on task ID, key arguments, and status.

Parameters:
  • task_id (str) – The ID of the task.

  • key_arguments (Optional[dict[str, str]]) – Optional key arguments for filtering.

  • statuses (Optional[list[“InvocationStatus”]]) – Optional status for filtering.

Returns:

An iterator of task invocations that match the criteria.

filter_by_status(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation], status_filter: set[pynenc.invocation.status.InvocationStatus]) list[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Get statuses for multiple invocations in a single Redis operation.

This is a low-level method that fetches the raw statuses from Redis, performing minimal processing. Unlike get_invocation_status, this method may return PENDING status even if it has expired, as it’s optimized for high-throughput batch queries.

Parameters:

invocations – List of DistributedInvocation to get statuses for

Returns:

list of DistributedInvocation with matching statuses

__del__() None[source]

Ensure thread pool is shut down properly when the object is destroyed.

class pynenc.orchestrator.redis_orchestrator.RedisOrchestrator(app: pynenc.app.Pynenc)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseOrchestrator

An orchestrator implementation using Redis for managing distributed task invocations.

This orchestrator leverages Redis to handle invocation status, retries, auto-purge, and cycle and blocking controls in a distributed task environment. It extends BaseOrchestrator and integrates closely with Redis to provide efficient and scalable task orchestration.

Parameters:

app (Pynenc) – The Pynenc application instance.

Initialization

conf() pynenc.conf.config_orchestrator.ConfigOrchestratorRedis
property client: redis.Redis

Lazy initialization of Redis client

property redis_cache: pynenc.orchestrator.redis_orchestrator.TaskRedisCache
property cycle_control: pynenc.orchestrator.redis_orchestrator.RedisCycleControl
property blocking_control: pynenc.orchestrator.redis_orchestrator.RedisBlockingControl
get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: Optional[dict[str, str]] = None, statuses: Optional[list[pynenc.invocation.status.InvocationStatus]] = None) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]
get_invocation(invocation_id: str) Optional[pynenc.invocation.dist_invocation.DistributedInvocation][source]
_set_invocation_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]
_set_invocations_status(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation], status: pynenc.invocation.status.InvocationStatus) None[source]

Set the status of multiple invocations at once using Redis pipeline.

Parameters:
_set_invocation_pending_status(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]
set_up_invocation_auto_purge(invocation: pynenc.invocation.dist_invocation.DistributedInvocation[pynenc.types.Params, pynenc.types.Result]) None[source]
auto_purge() None[source]
get_invocation_status(invocation: DistributedInvocation[Params, Result]) pynenc.invocation.status.InvocationStatus[source]
get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int[source]
increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None[source]
filter_by_status(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation], status_filter: set[pynenc.invocation.status.InvocationStatus] | None = None) list[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Filters a list of invocations by their status in an optimized way.

Uses batch Redis operations for better performance.

Parameters:
Returns:

List of invocations matching the status filter

purge() None[source]

Remove all invocations from the orchestrator