pynenc.orchestrator.redis_orchestrator#
Module Contents#
Classes#
A Redis-based implementation of cycle control using a directed acyclic graph (DAG). |
|
A Redis-based implementation of blocking control for task invocations. |
|
A Redis-based cache for managing task invocation statuses and retries. |
|
An orchestrator implementation using Redis for managing distributed task invocations. |
API#
- class pynenc.orchestrator.redis_orchestrator.RedisCycleControl(app: pynenc.app.Pynenc, client: redis.Redis)[source]#
Bases:
pynenc.orchestrator.base_orchestrator.BaseCycleControlA Redis-based implementation of cycle control using a directed acyclic graph (DAG).
This class manages the dependencies between task invocations in Redis to prevent cycles in task calling patterns, which could lead to deadlocks or infinite loops.
- Parameters:
app (Pynenc) – The Pynenc application instance.
client (redis.Redis) – The Redis client instance.
Initialization
- purge() None[source]#
Purges all data related to cycle control from Redis. This includes all stored invocations, calls, and their relationships.
- add_call_and_check_cycles(caller: pynenc.invocation.dist_invocation.DistributedInvocation, callee: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#
Adds a new call dependency between
callerandcalleeinvocations and checks for potential cycles.- Parameters:
caller (DistributedInvocation) – The invocation that is making the call.
callee (DistributedInvocation) – The invocation that is being called.
- Raises:
CycleDetectedError – If adding the call creates a cycle.
- remove_edges(call_id: str) None[source]#
Recursively removes all edges from a given call in the graph.
- Parameters:
call_id – The ID of the call from which to remove edges.
- clean_up_invocation_cycles(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#
Cleans up the graph by removing a given invocation and its associated edges.
- Parameters:
invocation (DistributedInvocation) – The
DistributedInvocationinstance to be removed.
- find_cycle_caused_by_new_invocation(caller: pynenc.invocation.dist_invocation.DistributedInvocation, callee: pynenc.invocation.dist_invocation.DistributedInvocation) list[pynenc.call.Call][source]#
Checks if adding a new call from
callertocalleewould create a cycle.- Parameters:
caller (DistributedInvocation) – The invocation making the call.
callee (DistributedInvocation) – The invocation being called.
- Returns:
List of
Callobjects forming the cycle, if a cycle is detected; otherwise, an empty list.
- class pynenc.orchestrator.redis_orchestrator.RedisBlockingControl(app: pynenc.app.Pynenc, client: redis.Redis)[source]#
Bases:
pynenc.orchestrator.base_orchestrator.BaseBlockingControlA Redis-based implementation of blocking control for task invocations.
Manages invocation dependencies and blocking states in a Redis-backed environment, ensuring that invocations waiting for others are properly tracked and released.
- Parameters:
app (Pynenc) – The Pynenc application instance.
client (redis.Redis) – The Redis client instance.
Initialization
- purge() None[source]#
Purges all data related to blocking control from Redis. This includes all stored invocations and their waiting relationships.
- waiting_for_results(waiter: pynenc.invocation.dist_invocation.DistributedInvocation, waiteds: list[pynenc.invocation.dist_invocation.DistributedInvocation]) None[source]#
Registers that an invocation (waiter) is waiting for the results of other invocations (waiteds).
- Parameters:
waiter (DistributedInvocation) – The invocation that is waiting.
waiteds (DistributedInvocation) – A list of invocations that the waiter is waiting for.
- release_waiters(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#
Removes an invocation from the tracking system. Also removes any dependencies related to the invocation.
- Parameters:
invocation (DistributedInvocation) – The
DistributedInvocationinstance to be removed.
- get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation[Params, Result]][source]#
Returns the invocations that are blocking others but are not waiting for anything themselves.
- Parameters:
max_num_invocations (int) – The maximum number of blocking invocations to retrieve.
- Returns:
An iterator of blocking
DistributedInvocationinstances.
- exception pynenc.orchestrator.redis_orchestrator.StatusNotFound[source]#
Bases:
ExceptionRaised when a status is not found in Redis
Initialization
Initialize self. See help(type(self)) for accurate signature.
- class pynenc.orchestrator.redis_orchestrator.TaskRedisCache(app: pynenc.app.Pynenc, client: redis.Redis)[source]#
A Redis-based cache for managing task invocation statuses and retries.
Provides methods to set and get the status and retry counts of task invocations, leveraging Redis for scalable and efficient storage.
- Parameters:
app (Pynenc) – The Pynenc application instance.
client (redis.Redis) – The Redis client instance.
Initialization
- set_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]#
Set the status of an invocation in Redis.
- Parameters:
invocation (DistributedInvocation) – The invocation to be updated.
status (InvocationStatus) – The new status of the invocation.
- set_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]#
Set the status of an invocation to pending in Redis.
Danger
We have to ensure that the invocation is not already pending. Otherwise, we could end up with a deadlock.
- Parameters:
invocation (DistributedInvocation) – The invocation to be updated.
- Raises:
PendingInvocationLockError – If was not possible to lock the Invocation.
- set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None[source]#
Sets up an invocation for automatic purging after a specified time.
- Parameters:
invocation (DistributedInvocation) – The invocation to be updated.
- auto_purge() None[source]#
Automatically purges invocations that have been in their final state beyond a specified duration.
Note
The duration is specified in the configuration file using the `auto_final_invocation_purge_hours` parameter.
- clean_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]#
Cleans up the pending status of a task invocation.
- Parameters:
invocation – The task invocation instance.
- _get_invocation_status(invocation_id: str) pynenc.invocation.status.InvocationStatus[source]#
Gets the status of a specific task invocation.
- Parameters:
invocation_id – The ID of the task invocation.
- Returns:
The current status of the task invocation.
- get_invocation_status(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) pynenc.invocation.status.InvocationStatus[source]#
Retrieves the status of a task invocation.
- Parameters:
invocation – The task invocation instance.
- Returns:
The current status of the task invocation.
- get_invocation_retries(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) int[source]#
Gets the number of retries for a specific task invocation.
- Parameters:
invocation – The task invocation instance.
- Returns:
The number of retries of the task invocation.
- increment_invocation_retries(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#
Increments the retry count for a specific task invocation.
- Parameters:
invocation – The task invocation instance.
- class pynenc.orchestrator.redis_orchestrator.RedisOrchestrator(app: pynenc.app.Pynenc)[source]#
Bases:
pynenc.orchestrator.base_orchestrator.BaseOrchestratorAn orchestrator implementation using Redis for managing distributed task invocations.
This orchestrator leverages Redis to handle invocation status, retries, auto-purge, and cycle and blocking controls in a distributed task environment. It extends
BaseOrchestratorand integrates closely with Redis to provide efficient and scalable task orchestration.- Parameters:
app (Pynenc) – The Pynenc application instance.
Initialization
- property cycle_control: pynenc.orchestrator.redis_orchestrator.RedisCycleControl#
- property blocking_control: pynenc.orchestrator.redis_orchestrator.RedisBlockingControl#
- get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: Optional[dict[str, str]] = None, statuses: Optional[list[pynenc.invocation.status.InvocationStatus]] = None) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]#
- _set_invocation_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]#
- _set_invocation_pending_status(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]#
- set_up_invocation_auto_purge(invocation: pynenc.invocation.dist_invocation.DistributedInvocation[pynenc.types.Params, pynenc.types.Result]) None[source]#
- get_invocation_status(invocation: DistributedInvocation[Params, Result]) pynenc.invocation.status.InvocationStatus[source]#
- get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int[source]#
- increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None[source]#