pynenc.orchestrator.redis_orchestrator#

Module Contents#

Classes#

RedisCycleControl

A Redis-based implementation of cycle control using a directed acyclic graph (DAG).

RedisBlockingControl

A Redis-based implementation of blocking control for task invocations.

TaskRedisCache

A Redis-based cache for managing task invocation statuses and retries.

RedisOrchestrator

An orchestrator implementation using Redis for managing distributed task invocations.

API#

class pynenc.orchestrator.redis_orchestrator.RedisCycleControl(app: pynenc.app.Pynenc, client: redis.Redis)[source]#

Bases: pynenc.orchestrator.base_orchestrator.BaseCycleControl

A Redis-based implementation of cycle control using a directed acyclic graph (DAG).

This class manages the dependencies between task invocations in Redis to prevent cycles in task calling patterns, which could lead to deadlocks or infinite loops.

Parameters:
  • app (Pynenc) – The Pynenc application instance.

  • client (redis.Redis) – The Redis client instance.

Initialization

purge() None[source]#

Purges all data related to cycle control from Redis. This includes all stored invocations, calls, and their relationships.

add_call_and_check_cycles(caller: pynenc.invocation.dist_invocation.DistributedInvocation, callee: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#

Adds a new call dependency between caller and callee invocations and checks for potential cycles.

Parameters:
Raises:

CycleDetectedError – If adding the call creates a cycle.

remove_edges(call_id: str) None[source]#

Recursively removes all edges from a given call in the graph.

Parameters:

call_id – The ID of the call from which to remove edges.

clean_up_invocation_cycles(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#

Cleans up the graph by removing a given invocation and its associated edges.

Parameters:

invocation (DistributedInvocation) – The DistributedInvocation instance to be removed.

find_cycle_caused_by_new_invocation(caller: pynenc.invocation.dist_invocation.DistributedInvocation, callee: pynenc.invocation.dist_invocation.DistributedInvocation) list[pynenc.call.Call][source]#

Checks if adding a new call from caller to callee would create a cycle.

Parameters:
Returns:

List of Call objects forming the cycle, if a cycle is detected; otherwise, an empty list.

_is_cyclic_util(current_call_id: str, visited: set[str], path: list[str]) list[pynenc.call.Call][source]#

A utility function for cycle detection.

Parameters:
  • current_call_id (str) – The current call ID being examined.

  • visited (set[str]) – A set of visited call IDs for cycle detection.

  • path (list[str]) – A list representing the current path of call IDs.

Returns:

List of Call objects forming a cycle, if a cycle is detected; otherwise, an empty list.

class pynenc.orchestrator.redis_orchestrator.RedisBlockingControl(app: pynenc.app.Pynenc, client: redis.Redis)[source]#

Bases: pynenc.orchestrator.base_orchestrator.BaseBlockingControl

A Redis-based implementation of blocking control for task invocations.

Manages invocation dependencies and blocking states in a Redis-backed environment, ensuring that invocations waiting for others are properly tracked and released.

Parameters:
  • app (Pynenc) – The Pynenc application instance.

  • client (redis.Redis) – The Redis client instance.

Initialization

purge() None[source]#

Purges all data related to blocking control from Redis. This includes all stored invocations and their waiting relationships.

waiting_for_results(waiter: pynenc.invocation.dist_invocation.DistributedInvocation, waiteds: list[pynenc.invocation.dist_invocation.DistributedInvocation]) None[source]#

Registers that an invocation (waiter) is waiting for the results of other invocations (waiteds).

Parameters:
release_waiters(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#

Removes an invocation from the tracking system. Also removes any dependencies related to the invocation.

Parameters:

invocation (DistributedInvocation) – The DistributedInvocation instance to be removed.

get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation[Params, Result]][source]#

Returns the invocations that are blocking others but are not waiting for anything themselves.

Parameters:

max_num_invocations (int) – The maximum number of blocking invocations to retrieve.

Returns:

An iterator of blocking DistributedInvocation instances.

exception pynenc.orchestrator.redis_orchestrator.StatusNotFound[source]#

Bases: Exception

Raised when a status is not found in Redis

Initialization

Initialize self. See help(type(self)) for accurate signature.

class pynenc.orchestrator.redis_orchestrator.TaskRedisCache(app: pynenc.app.Pynenc, client: redis.Redis)[source]#

A Redis-based cache for managing task invocation statuses and retries.

Provides methods to set and get the status and retry counts of task invocations, leveraging Redis for scalable and efficient storage.

Parameters:
  • app (Pynenc) – The Pynenc application instance.

  • client (redis.Redis) – The Redis client instance.

Initialization

purge() None[source]#

Purges all data related to task invocations from Redis.

set_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]#

Set the status of an invocation in Redis.

Parameters:
set_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]#

Set the status of an invocation to pending in Redis.

Danger

We have to ensure that the invocation is not already pending.
Otherwise, we could end up with a deadlock.
Parameters:

invocation (DistributedInvocation) – The invocation to be updated.

Raises:

PendingInvocationLockError – If was not possible to lock the Invocation.

set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None[source]#

Sets up an invocation for automatic purging after a specified time.

Parameters:

invocation (DistributedInvocation) – The invocation to be updated.

auto_purge() None[source]#

Automatically purges invocations that have been in their final state beyond a specified duration.

Note

The duration is specified in the configuration file using the `auto_final_invocation_purge_hours` parameter.
clean_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]#

Cleans up the pending status of a task invocation.

Parameters:

invocation – The task invocation instance.

_get_invocation_status(invocation_id: str) pynenc.invocation.status.InvocationStatus[source]#

Gets the status of a specific task invocation.

Parameters:

invocation_id – The ID of the task invocation.

Returns:

The current status of the task invocation.

get_invocation_status(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) pynenc.invocation.status.InvocationStatus[source]#

Retrieves the status of a task invocation.

Parameters:

invocation – The task invocation instance.

Returns:

The current status of the task invocation.

get_invocation_retries(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) int[source]#

Gets the number of retries for a specific task invocation.

Parameters:

invocation – The task invocation instance.

Returns:

The number of retries of the task invocation.

increment_invocation_retries(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#

Increments the retry count for a specific task invocation.

Parameters:

invocation – The task invocation instance.

get_invocations(task_id: str, key_arguments: Optional[dict[str, str]], statuses: Optional[list[pynenc.invocation.status.InvocationStatus]]) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]#

Retrieves task invocations based on task ID, key arguments, and status.

Parameters:
  • task_id (str) – The ID of the task.

  • key_arguments (Optional[dict[str, str]]) – Optional key arguments for filtering.

  • statuses (Optional[list[“InvocationStatus”]]) – Optional status for filtering.

Returns:

An iterator of task invocations that match the criteria.

class pynenc.orchestrator.redis_orchestrator.RedisOrchestrator(app: pynenc.app.Pynenc)[source]#

Bases: pynenc.orchestrator.base_orchestrator.BaseOrchestrator

An orchestrator implementation using Redis for managing distributed task invocations.

This orchestrator leverages Redis to handle invocation status, retries, auto-purge, and cycle and blocking controls in a distributed task environment. It extends BaseOrchestrator and integrates closely with Redis to provide efficient and scalable task orchestration.

Parameters:

app (Pynenc) – The Pynenc application instance.

Initialization

conf() pynenc.conf.config_orchestrator.ConfigOrchestratorRedis#
property cycle_control: pynenc.orchestrator.redis_orchestrator.RedisCycleControl#
property blocking_control: pynenc.orchestrator.redis_orchestrator.RedisBlockingControl#
get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: Optional[dict[str, str]] = None, statuses: Optional[list[pynenc.invocation.status.InvocationStatus]] = None) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]#
_set_invocation_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]#
_set_invocation_pending_status(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#
set_up_invocation_auto_purge(invocation: pynenc.invocation.dist_invocation.DistributedInvocation[pynenc.types.Params, pynenc.types.Result]) None[source]#
auto_purge() None[source]#
get_invocation_status(invocation: DistributedInvocation[Params, Result]) pynenc.invocation.status.InvocationStatus[source]#
get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int[source]#
increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None[source]#
purge() None[source]#

Remove all invocations from the orchestrator