pynenc.state_backend.redis_state_backend

Module Contents

Classes

RedisStateBackend

A Redis-based implementation of the state backend.

API

class pynenc.state_backend.redis_state_backend.RedisStateBackend(app: pynenc.app.Pynenc)[source]

Bases: pynenc.state_backend.base_state_backend.BaseStateBackend

A Redis-based implementation of the state backend.

This backend uses Redis to store and retrieve the state of invocations, including their data, history, results, and exceptions. It’s suitable for distributed systems where shared state management is required.

Initialization

conf() pynenc.conf.config_state_backend.ConfigStateBackendRedis
property client: redis.Redis

Lazy initialization of Redis client

purge() None[source]

Clears all data from the Redis backend for the current app.app_id.

_upsert_invocation(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Inserts or updates an invocation in Redis.

Parameters:

invocation (DistributedInvocation) – The invocation object to upsert.

_get_invocation(invocation_id: str) DistributedInvocation[Params, Result][source]

Retrieves an invocation from Redis by its ID.

Parameters:

invocation_id (DistributedInvocation) – The ID of the invocation to retrieve.

Returns:

The retrieved invocation object.

_add_history(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]

Adds a history record to an invocation in Redis.

Parameters:
_get_history(invocation: DistributedInvocation[Params, Result]) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]

Retrieves the history of an invocation from Redis.

Parameters:

invocation (DistributedInvocation) – The invocation to get the history for.

Returns:

A list of invocation history records.

_set_result(invocation: DistributedInvocation[Params, Result], result: pynenc.types.Result) None[source]

Sets the result for an invocation in Redis.

Parameters:
  • invocation (DistributedInvocation) – The invocation to set the result for.

  • result (Result) – The result of the invocation.

_get_result(invocation: DistributedInvocation[Params, Result]) pynenc.types.Result[source]

Retrieves the result of an invocation from Redis.

Parameters:

invocation (DistributedInvocation) – The invocation to get the result for.

Returns:

The result of the invocation.

_set_exception(invocation: DistributedInvocation[Params, Result], exception: Exception) None[source]

Sets the exception for an invocation in Redis.

Parameters:
_get_exception(invocation: DistributedInvocation[Params, Result]) Exception[source]

Retrieves the exception of an invocation from Redis.

Parameters:

invocation (DistributedInvocation) – The invocation to get the exception for.

Returns:

The exception of the invocation.

get_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, default: Any = None) Any[source]

Get a value from workflow data.

Parameters:
  • workflow_identity – Workflow identity

  • key – Data key to retrieve

  • default – Default value if key doesn’t exist

Returns:

Stored value or default

set_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]

Set a value in workflow data.

Parameters:
  • workflow_identity – Workflow identity

  • key – Data key to set

  • value – Value to store

get_workflow_deterministic_value(workflow: pynenc.workflow.WorkflowIdentity, key: str) Any[source]

Retrieve a deterministic value for workflow operations.

Parameters:
  • workflow – The workflow identity

  • key – Key identifying the deterministic value

Returns:

The stored value or None if not found

set_workflow_deterministic_value(workflow: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]

Store a deterministic value for workflow operations.

Parameters:
  • workflow – The workflow identity

  • key – Key identifying the deterministic value

  • value – The value to store (must be serializable)

store_app_info(app_info: pynenc.app.AppInfo) None[source]

Register this app’s information in the state backend for discovery.

Parameters:

app_info – The app information to store

get_app_info() pynenc.app.AppInfo[source]

Retrieve information of the current app.

Returns:

The app information

Raises:

ValueError – If app info is not found

static get_all_app_infos() dict[str, pynenc.app.AppInfo][source]

Retrieve all app information registered in this state backend.

Returns:

Dictionary mapping app_id to app information

store_workflow_run(workflow_identity: pynenc.workflow.WorkflowIdentity) None[source]

Store a workflow run for tracking and monitoring.

Maintains workflow type registry and specific workflow run instances. This enables monitoring of workflow types and their execution history.

Parameters:

workflow_identity – The workflow identity to store

get_all_workflows() Iterator[str][source]

Retrieve all workflow types (workflow_task_ids) stored in this Redis state backend.

Returns:

Iterator of workflow task IDs representing different workflow types (task_ids)

get_all_workflows_runs() Iterator[pynenc.workflow.WorkflowIdentity][source]

Retrieve workflow run identities from this Redis state backend.

Returns:

Iterator of workflow identities for runs

get_workflow_runs(workflow_task_id: str) Iterator[pynenc.workflow.WorkflowIdentity][source]

Retrieve workflow run identities from this Redis state backend with pagination.

Uses configurable batch size to efficiently handle large datasets without overwhelming memory usage by processing data in manageable chunks.

Parameters:

workflow_task_id – Filter for specific workflow type

Returns:

Iterator of workflow identities for runs

store_workflow_sub_invocation(parent_workflow_id: str, sub_invocation_id: str) None[source]

Store a sub-invocation ID that runs inside a parent workflow.

Parameters:
  • parent_workflow_id – The workflow ID that contains the sub-invocation

  • sub_invocation_id – The invocation ID of the task/sub-workflow running inside

get_workflow_sub_invocations(workflow_id: str) Iterator[str][source]

Retrieve all sub-invocation IDs that run inside a specific workflow.

Parameters:

workflow_id – The workflow ID to get sub-invocations for

Returns:

Iterator of invocation IDs that run inside the workflow