pynenc.orchestrator.redis_orchestrator¶
Module Contents¶
Classes¶
A Redis-based implementation of cycle control using a directed acyclic graph (DAG). |
|
A Redis-based implementation of blocking control for task invocations. |
|
A Redis-based cache for managing task invocation statuses and retries. |
|
An orchestrator implementation using Redis for managing distributed task invocations. |
Functions¶
Remove completed futures from the registry to prevent memory leaks. |
Data¶
API¶
- pynenc.orchestrator.redis_orchestrator._pending_resolution_threads: dict[str, concurrent.futures.Future[None]]¶
None
- pynenc.orchestrator.redis_orchestrator._registry_lock¶
‘Lock(…)’
- pynenc.orchestrator.redis_orchestrator._clean_dead_threads() None[source]¶
Remove completed futures from the registry to prevent memory leaks.
- exception pynenc.orchestrator.redis_orchestrator.StatusNotFound[source]¶
Bases:
ExceptionRaised when a status is not found in Redis
Initialization
Initialize self. See help(type(self)) for accurate signature.
- class pynenc.orchestrator.redis_orchestrator.RedisCycleControl(app: pynenc.app.Pynenc, client: redis.Redis)[source]¶
Bases:
pynenc.orchestrator.base_orchestrator.BaseCycleControlA Redis-based implementation of cycle control using a directed acyclic graph (DAG).
This class manages the dependencies between task invocations in Redis to prevent cycles in task calling patterns, which could lead to deadlocks or infinite loops.
- Parameters:
app (Pynenc) – The Pynenc application instance.
client (redis.Redis) – The Redis client instance.
Initialization
- purge() None[source]¶
Purges all data related to cycle control from Redis. This includes all stored invocations, calls, and their relationships.
- add_call_and_check_cycles(caller: pynenc.invocation.dist_invocation.DistributedInvocation, callee: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Adds a new call dependency between
callerandcalleeinvocations and checks for potential cycles.- Parameters:
caller (DistributedInvocation) – The invocation that is making the call.
callee (DistributedInvocation) – The invocation that is being called.
- Raises:
CycleDetectedError – If adding the call creates a cycle.
- remove_edges(call_id: str) None[source]¶
Recursively removes all edges from a given call in the graph.
- Parameters:
call_id – The ID of the call from which to remove edges.
- clean_up_invocation_cycles(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Cleans up the graph by removing a given invocation and its associated edges.
- Parameters:
invocation (DistributedInvocation) – The
DistributedInvocationinstance to be removed.
- find_cycle_caused_by_new_invocation(caller: pynenc.invocation.dist_invocation.DistributedInvocation, callee: pynenc.invocation.dist_invocation.DistributedInvocation) list[pynenc.call.Call][source]¶
Checks if adding a new call from
callertocalleewould create a cycle.- Parameters:
caller (DistributedInvocation) – The invocation making the call.
callee (DistributedInvocation) – The invocation being called.
- Returns:
List of
Callobjects forming the cycle, if a cycle is detected; otherwise, an empty list.
- class pynenc.orchestrator.redis_orchestrator.RedisBlockingControl(app: pynenc.app.Pynenc, client: redis.Redis)[source]¶
Bases:
pynenc.orchestrator.base_orchestrator.BaseBlockingControlA Redis-based implementation of blocking control for task invocations.
Manages invocation dependencies and blocking states in a Redis-backed environment, ensuring that invocations waiting for others are properly tracked and released.
- Parameters:
app (Pynenc) – The Pynenc application instance.
client (redis.Redis) – The Redis client instance.
Initialization
- purge() None[source]¶
Purges all data related to blocking control from Redis. This includes all stored invocations and their waiting relationships.
- waiting_for_results(waiter: pynenc.invocation.dist_invocation.DistributedInvocation, waiteds: list[pynenc.invocation.dist_invocation.DistributedInvocation]) None[source]¶
Registers that an invocation (waiter) is waiting for the results of other invocations (waiteds).
- Parameters:
waiter (DistributedInvocation) – The invocation that is waiting.
waiteds (DistributedInvocation) – A list of invocations that the waiter is waiting for.
- release_waiters(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Removes an invocation from the tracking system. Also removes any dependencies related to the invocation.
- Parameters:
invocation (DistributedInvocation) – The
DistributedInvocationinstance to be removed.
- get_blocking_invocations(max_num_invocations: int) Iterator[DistributedInvocation[Params, Result]][source]¶
Returns the invocations that are blocking others but are not waiting for anything themselves.
- Parameters:
max_num_invocations (int) – The maximum number of blocking invocations to retrieve.
- Returns:
An iterator of blocking
DistributedInvocationinstances.
- class pynenc.orchestrator.redis_orchestrator.TaskRedisCache(app: pynenc.app.Pynenc, client: redis.Redis)[source]¶
A Redis-based cache for managing task invocation statuses and retries.
Provides methods to set and get the status and retry counts of task invocations, leveraging Redis for scalable and efficient storage.
- Parameters:
app (Pynenc) – The Pynenc application instance.
client (redis.Redis) – The Redis client instance.
Initialization
- property conf: pynenc.conf.config_orchestrator.ConfigOrchestratorRedis¶
- _set_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus, previous_status: Optional[pynenc.invocation.status.InvocationStatus], pipeline: redis.client.Pipeline) None[source]¶
Inner method to set the status of a single invocation using a provided pipeline.
- Parameters:
invocation (DistributedInvocation) – The invocation to update.
status (InvocationStatus) – The new status to set.
previous_status (Optional[InvocationStatus]) – The previous status, or None if it’s a new invocation.
pipeline (redis.client.Pipeline) – The Redis pipeline to use for commands.
- set_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]¶
Set the status of a single invocation in Redis, optionally using a provided pipeline.
- Parameters:
invocation (DistributedInvocation) – The invocation to update.
status (InvocationStatus) – The new status to set.
pipeline (Optional[redis.client.Pipeline]) – Optional Redis pipeline; if None, creates and executes one.
- set_batch_status(invocations: list[DistributedInvocation[Params, Result]], status: pynenc.invocation.status.InvocationStatus) None[source]¶
Set the status of multiple invocations at once using a Redis pipeline.
- Parameters:
invocations (list[DistributedInvocation]) – The invocations to update.
status (InvocationStatus) – The status to set.
- set_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]¶
Set the status of an invocation to pending in Redis.
Danger
We have to ensure that the invocation is not already pending. Otherwise, we could end up with a deadlock.
- Parameters:
invocation (DistributedInvocation) – The invocation to be updated.
- Raises:
PendingInvocationLockError – If was not possible to lock the Invocation.
- set_up_invocation_auto_purge(invocation: DistributedInvocation[Params, Result]) None[source]¶
Sets up an invocation for automatic purging after a specified time.
- Parameters:
invocation (DistributedInvocation) – The invocation to be updated.
- auto_purge() None[source]¶
Automatically purges invocations that have been in their final state beyond a specified duration.
Note
The duration is specified in the configuration file using the `auto_final_invocation_purge_hours` parameter.
- clean_pending_status(invocation: DistributedInvocation[Params, Result]) None[source]¶
Cleans up the pending status of a task invocation.
- Parameters:
invocation – The task invocation instance.
- _get_invocation_status(invocation_id: str) pynenc.invocation.status.InvocationStatus[source]¶
Gets the status of a specific task invocation.
- Parameters:
invocation_id – The ID of the task invocation.
- Returns:
The current status of the task invocation.
- get_invocation_status(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) pynenc.invocation.status.InvocationStatus[source]¶
Retrieves the status of a task invocation.
- Parameters:
invocation – The task invocation instance.
- Returns:
The current status of the task invocation.
- _async_resolve_pending_status(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Asynchronously resolve and update an expired PENDING status.
This method is designed to run in a background thread.
- Parameters:
invocation_id (str) – The ID of the invocation to update
- _start_async_pending_resolution_for_invocation(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Start asynchronous resolution of a PENDING status.
Uses ThreadPoolExecutor to manage a pool of worker threads.
- Parameters:
invocation – The invocation to check PENDING status for
- get_invocation_retries(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) int[source]¶
Gets the number of retries for a specific task invocation.
- Parameters:
invocation – The task invocation instance.
- Returns:
The number of retries of the task invocation.
- increment_invocation_retries(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Increments the retry count for a specific task invocation.
- Parameters:
invocation – The task invocation instance.
- get_invocations(task_id: str, key_arguments: Optional[dict[str, str]], statuses: Optional[list[pynenc.invocation.status.InvocationStatus]]) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Retrieves task invocations based on task ID, key arguments, and status.
- filter_by_status(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation], status_filter: set[pynenc.invocation.status.InvocationStatus]) list[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Get statuses for multiple invocations in a single Redis operation.
This is a low-level method that fetches the raw statuses from Redis, performing minimal processing. Unlike get_invocation_status, this method may return PENDING status even if it has expired, as it’s optimized for high-throughput batch queries.
- Parameters:
invocations – List of DistributedInvocation to get statuses for
- Returns:
list of DistributedInvocation with matching statuses
- class pynenc.orchestrator.redis_orchestrator.RedisOrchestrator(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.orchestrator.base_orchestrator.BaseOrchestratorAn orchestrator implementation using Redis for managing distributed task invocations.
This orchestrator leverages Redis to handle invocation status, retries, auto-purge, and cycle and blocking controls in a distributed task environment. It extends
BaseOrchestratorand integrates closely with Redis to provide efficient and scalable task orchestration.- Parameters:
app (Pynenc) – The Pynenc application instance.
Initialization
- property client: redis.Redis¶
Lazy initialization of Redis client
- property redis_cache: pynenc.orchestrator.redis_orchestrator.TaskRedisCache¶
- property cycle_control: pynenc.orchestrator.redis_orchestrator.RedisCycleControl¶
- property blocking_control: pynenc.orchestrator.redis_orchestrator.RedisBlockingControl¶
- get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: Optional[dict[str, str]] = None, statuses: Optional[list[pynenc.invocation.status.InvocationStatus]] = None) Iterator[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
- get_invocation(invocation_id: str) Optional[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
- _set_invocation_status(invocation: DistributedInvocation[Params, Result], status: pynenc.invocation.status.InvocationStatus) None[source]¶
- _set_invocations_status(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation], status: pynenc.invocation.status.InvocationStatus) None[source]¶
Set the status of multiple invocations at once using Redis pipeline.
- Parameters:
invocations (list[DistributedInvocation]) – The invocations to update.
status (InvocationStatus) – The status to set.
- _set_invocation_pending_status(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
- set_up_invocation_auto_purge(invocation: pynenc.invocation.dist_invocation.DistributedInvocation[pynenc.types.Params, pynenc.types.Result]) None[source]¶
- get_invocation_status(invocation: DistributedInvocation[Params, Result]) pynenc.invocation.status.InvocationStatus[source]¶
- get_invocation_retries(invocation: DistributedInvocation[Params, Result]) int[source]¶
- increment_invocation_retries(invocation: DistributedInvocation[Params, Result]) None[source]¶
- filter_by_status(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation], status_filter: set[pynenc.invocation.status.InvocationStatus] | None = None) list[pynenc.invocation.dist_invocation.DistributedInvocation][source]¶
Filters a list of invocations by their status in an optimized way.
Uses batch Redis operations for better performance.
- Parameters:
invocations (list[DistributedInvocation]) – The invocations to filter
status_filter (list[InvocationStatus] | None) – The statuses to filter by
- Returns:
List of invocations matching the status filter