pynenc.state_backend.mem_state_backend

Module Contents

Classes

MemStateBackend

A memory-based implementation of the state backend.

API

class pynenc.state_backend.mem_state_backend.MemStateBackend(app: pynenc.app.Pynenc)[source]

Bases: pynenc.state_backend.base_state_backend.BaseStateBackend[pynenc.types.Params, pynenc.types.Result]

A memory-based implementation of the state backend.

Stores invocation data, history, results, and exceptions in in-memory dictionaries. Useful for environments where persistence is not required or for testing purposes.

Warning

The MemStateBackend class stores all data in the process’s memory and is not suitable for production systems. Its use should be limited to testing or demonstration purposes only.

Initialization

_app_info_registry: dict[str, pynenc.app.AppInfo]

None

_registry_lock: threading.Lock

‘Lock(…)’

purge() None[source]

Clears all stored data

_upsert_invocations(entries: list[tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO]]) None[source]

Store invocation and call DTO pairs in the memory cache.

Also maintains the parent-to-children index for efficient family tree traversal without scanning the entire cache.

_get_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO] | None[source]

Retrieve an invocation DTO pair from the memory cache.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to retrieve.

Returns:

Paired DTOs if found, else None.

get_child_invocations(parent_invocation_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Return IDs of invocations that name the given ID as their parent.

Uses the pre-built parent-to-children index instead of scanning the entire cache, providing O(1) lookup per parent.

Parameters:

parent_invocation_id – The parent invocation ID to search for.

Returns:

Iterator of child invocation IDs.

_add_histories(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]

Adds the same history record for a list of invocations.

_get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]

Retrieves the history of an invocation ordered by timestamp.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to get the history from

Returns:

List of InvocationHistory records

_get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]

Retrieves the result of an invocation.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to get the result from

Returns:

The serialized result string

_set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_result: str) None[source]

Sets the result of an invocation.

Parameters:
  • invocation_id (“InvocationId”) – The ID of the invocation to set

  • serialized_result (str) – The serialized result string to set

_get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]

Retrieves the exception of an invocation.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to get the exception from

Returns:

The serialized exception string

_set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_exception: str) None[source]

Sets the raised exception by an invocation ran.

Parameters:
  • invocation_id (“InvocationId”) – The ID of the invocation to set

  • serialized_exception (str) – The serialized exception string to set

get_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, default: Any = None) Any[source]

Get a value from workflow data.

Parameters:
  • workflow_identity (“WorkflowIdentity”) – Workflow identity

  • key (str) – Data key to retrieve

  • default (Any) – Default value if key doesn’t exist

Returns:

Stored value or default

set_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]

Set a value in workflow data.

Parameters:
  • workflow_identity – Workflow identity

  • key – Data key to set

  • value – Value to store

store_app_info(app_info: pynenc.app.AppInfo) None[source]

Register this app’s information in the state backend for discovery.

Parameters:

app_info – The app information to store

get_app_info() pynenc.app.AppInfo[source]

Retrieve information of the current app.

Returns:

The app information

Raises:

ValueError – If app info is not found

static discover_app_infos() dict[str, pynenc.app.AppInfo][source]
store_workflow_run(workflow_identity: pynenc.workflow.WorkflowIdentity) None[source]

Store a workflow run for tracking and monitoring.

Parameters:

workflow_identity – The workflow identity to store

get_all_workflow_types() collections.abc.Iterator[pynenc.identifiers.task_id.TaskId][source]

Retrieve all workflow types (workflow_task_ids) stored in this state backend.

Returns:

Iterator of workflow task IDs representing different workflow types

get_all_workflow_runs() collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]

Retrieve workflow run identities from this state backend.

Returns:

Iterator of workflow identities for runs

get_workflow_runs(workflow_type: pynenc.identifiers.task_id.TaskId) collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]

Retrieve workflow run identities from this state backend.

Parameters:

workflow_type – Filter for specific workflow type

Returns:

Iterator of workflow identities for runs

store_workflow_sub_invocation(parent_workflow_id: pynenc.identifiers.invocation_id.InvocationId, sub_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Store a sub-invocation ID that runs inside a parent workflow.

Parameters:
  • parent_workflow_id – The workflow ID that contains the sub-invocation

  • sub_invocation_id – The invocation ID of the task/sub-workflow running inside

get_workflow_sub_invocations(workflow_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve all sub-invocation IDs that run inside a specific workflow.

Parameters:

workflow_id – The workflow ID to get sub-invocations for

Returns:

Iterator of invocation IDs that run inside the workflow

iter_invocations_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.identifiers.invocation_id.InvocationId]][source]

Iterate over invocation IDs that have history within time range.

iter_history_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.state_backend.base_state_backend.InvocationHistory]][source]

Iterate over history entries within time range.

_store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]

Store a runner context.

Parameters:
  • runner_id (str) – The runner’s unique identifier

  • runner_context (RunnerContext) – The context to store

_get_runner_context(runner_id: str) RunnerContext | None[source]

Retrieve a runner context by runner_id.

Parameters:

runner_id (str) – The runner’s unique identifier

Returns:

The stored RunnerContext or None if not found

_get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]

Retrieve multiple runner contexts by their IDs.

Parameters:

runner_ids (list[str]) – List of runner unique identifiers

Returns:

list[“RunnerContext”] of the stored RunnerContexts

get_matching_runner_contexts(partial_id: str) collections.abc.Iterator[pynenc.runner.runner_context.RunnerContext][source]

Search runner contexts by partial ID match.

get_invocation_ids_by_workflow(workflow_id: str | None = None, workflow_type_key: str | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs filtered by workflow criteria.