pynenc.state_backend.redis_state_backend¶
Module Contents¶
Classes¶
A Redis-based implementation of the state backend. |
API¶
- class pynenc.state_backend.redis_state_backend.RedisStateBackend(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.state_backend.base_state_backend.BaseStateBackendA Redis-based implementation of the state backend.
This backend uses Redis to store and retrieve the state of invocations, including their data, history, results, and exceptions. It’s suitable for distributed systems where shared state management is required.
Initialization
- property client: redis.Redis¶
Lazy initialization of Redis client
- _upsert_invocation(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]¶
Inserts or updates an invocation in Redis.
- Parameters:
invocation (DistributedInvocation) – The invocation object to upsert.
- _get_invocation(invocation_id: str) DistributedInvocation[Params, Result][source]¶
Retrieves an invocation from Redis by its ID.
- Parameters:
invocation_id (DistributedInvocation) – The ID of the invocation to retrieve.
- Returns:
The retrieved invocation object.
- _add_history(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]¶
Adds a history record to an invocation in Redis.
- Parameters:
invocation (DistributedInvocation) – The invocation to add history for.
invocation_history (InvocationHistory) – The history record to add.
- _get_history(invocation: DistributedInvocation[Params, Result]) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]¶
Retrieves the history of an invocation from Redis.
- Parameters:
invocation (DistributedInvocation) – The invocation to get the history for.
- Returns:
A list of invocation history records.
- _set_result(invocation: DistributedInvocation[Params, Result], result: pynenc.types.Result) None[source]¶
Sets the result for an invocation in Redis.
- Parameters:
invocation (DistributedInvocation) – The invocation to set the result for.
result (Result) – The result of the invocation.
- _get_result(invocation: DistributedInvocation[Params, Result]) pynenc.types.Result[source]¶
Retrieves the result of an invocation from Redis.
- Parameters:
invocation (DistributedInvocation) – The invocation to get the result for.
- Returns:
The result of the invocation.
- _set_exception(invocation: DistributedInvocation[Params, Result], exception: Exception) None[source]¶
Sets the exception for an invocation in Redis.
- Parameters:
invocation (DistributedInvocation) – The invocation to set the exception for.
exception (Exception) – The exception to set.
- _get_exception(invocation: DistributedInvocation[Params, Result]) Exception[source]¶
Retrieves the exception of an invocation from Redis.
- Parameters:
invocation (DistributedInvocation) – The invocation to get the exception for.
- Returns:
The exception of the invocation.
- get_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, default: Any = None) Any[source]¶
Get a value from workflow data.
- Parameters:
workflow_identity – Workflow identity
key – Data key to retrieve
default – Default value if key doesn’t exist
- Returns:
Stored value or default
- set_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]¶
Set a value in workflow data.
- Parameters:
workflow_identity – Workflow identity
key – Data key to set
value – Value to store
- get_workflow_deterministic_value(workflow: pynenc.workflow.WorkflowIdentity, key: str) Any[source]¶
Retrieve a deterministic value for workflow operations.
- Parameters:
workflow – The workflow identity
key – Key identifying the deterministic value
- Returns:
The stored value or None if not found
- set_workflow_deterministic_value(workflow: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]¶
Store a deterministic value for workflow operations.
- Parameters:
workflow – The workflow identity
key – Key identifying the deterministic value
value – The value to store (must be serializable)
- store_app_info(app_info: pynenc.app.AppInfo) None[source]¶
Register this app’s information in the state backend for discovery.
- Parameters:
app_info – The app information to store
- get_app_info() pynenc.app.AppInfo[source]¶
Retrieve information of the current app.
- Returns:
The app information
- Raises:
ValueError – If app info is not found
- static get_all_app_infos() dict[str, pynenc.app.AppInfo][source]¶
Retrieve all app information registered in this state backend.
- Returns:
Dictionary mapping app_id to app information
- store_workflow_run(workflow_identity: pynenc.workflow.WorkflowIdentity) None[source]¶
Store a workflow run for tracking and monitoring.
Maintains workflow type registry and specific workflow run instances. This enables monitoring of workflow types and their execution history.
- Parameters:
workflow_identity – The workflow identity to store
- get_all_workflows() Iterator[str][source]¶
Retrieve all workflow types (workflow_task_ids) stored in this Redis state backend.
- Returns:
Iterator of workflow task IDs representing different workflow types (task_ids)
- get_all_workflows_runs() Iterator[pynenc.workflow.WorkflowIdentity][source]¶
Retrieve workflow run identities from this Redis state backend.
- Returns:
Iterator of workflow identities for runs
- get_workflow_runs(workflow_task_id: str) Iterator[pynenc.workflow.WorkflowIdentity][source]¶
Retrieve workflow run identities from this Redis state backend with pagination.
Uses configurable batch size to efficiently handle large datasets without overwhelming memory usage by processing data in manageable chunks.
- Parameters:
workflow_task_id – Filter for specific workflow type
- Returns:
Iterator of workflow identities for runs
- store_workflow_sub_invocation(parent_workflow_id: str, sub_invocation_id: str) None[source]¶
Store a sub-invocation ID that runs inside a parent workflow.
- Parameters:
parent_workflow_id – The workflow ID that contains the sub-invocation
sub_invocation_id – The invocation ID of the task/sub-workflow running inside