pynenc.state_backend.mem_state_backend¶
Module Contents¶
Classes¶
A memory-based implementation of the state backend. |
Data¶
API¶
- class pynenc.state_backend.mem_state_backend.MemStateBackend(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.state_backend.base_state_backend.BaseStateBackend[pynenc.types.Params,pynenc.types.Result]A memory-based implementation of the state backend.
Stores invocation data, history, results, and exceptions in in-memory dictionaries. Useful for environments where persistence is not required or for testing purposes.
Warning
The
MemStateBackendclass stores all data in the process’s memory and is not suitable for production systems. Its use should be limited to testing or demonstration purposes only.Initialization
- _upsert_invocations(entries: list[tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO]]) None[source]¶
Store invocation and call DTO pairs in the memory cache.
Also maintains the parent-to-children index for efficient family tree traversal without scanning the entire cache.
- _get_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO] | None[source]¶
Retrieve an invocation DTO pair from the memory cache.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to retrieve.
- Returns:
Paired DTOs if found, else None.
- get_child_invocations(parent_invocation_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Return IDs of invocations that name the given ID as their parent.
Uses the pre-built parent-to-children index instead of scanning the entire cache, providing O(1) lookup per parent.
- Parameters:
parent_invocation_id – The parent invocation ID to search for.
- Returns:
Iterator of child invocation IDs.
- _add_histories(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]¶
Adds the same history record for a list of invocations.
- _get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]¶
Retrieves the history of an invocation ordered by timestamp.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to get the history from
- Returns:
List of InvocationHistory records
- _get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]¶
Retrieves the result of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to get the result from
- Returns:
The serialized result string
- _set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_result: str) None[source]¶
Sets the result of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to set
serialized_result (str) – The serialized result string to set
- _get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]¶
Retrieves the exception of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to get the exception from
- Returns:
The serialized exception string
- _set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_exception: str) None[source]¶
Sets the raised exception by an invocation ran.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to set
serialized_exception (str) – The serialized exception string to set
- get_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, default: Any = None) Any[source]¶
Get a value from workflow data.
- Parameters:
workflow_identity (“WorkflowIdentity”) – Workflow identity
key (str) – Data key to retrieve
default (Any) – Default value if key doesn’t exist
- Returns:
Stored value or default
- set_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]¶
Set a value in workflow data.
- Parameters:
workflow_identity – Workflow identity
key – Data key to set
value – Value to store
- store_app_info(app_info: pynenc.app.AppInfo) None[source]¶
Register this app’s information in the state backend for discovery.
- Parameters:
app_info – The app information to store
- get_app_info() pynenc.app.AppInfo[source]¶
Retrieve information of the current app.
- Returns:
The app information
- Raises:
ValueError – If app info is not found
- store_workflow_run(workflow_identity: pynenc.workflow.WorkflowIdentity) None[source]¶
Store a workflow run for tracking and monitoring.
- Parameters:
workflow_identity – The workflow identity to store
- get_all_workflow_types() collections.abc.Iterator[pynenc.identifiers.task_id.TaskId][source]¶
Retrieve all workflow types (workflow_task_ids) stored in this state backend.
- Returns:
Iterator of workflow task IDs representing different workflow types
- get_all_workflow_runs() collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]¶
Retrieve workflow run identities from this state backend.
- Returns:
Iterator of workflow identities for runs
- get_workflow_runs(workflow_type: pynenc.identifiers.task_id.TaskId) collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]¶
Retrieve workflow run identities from this state backend.
- Parameters:
workflow_type – Filter for specific workflow type
- Returns:
Iterator of workflow identities for runs
- store_workflow_sub_invocation(parent_workflow_id: pynenc.identifiers.invocation_id.InvocationId, sub_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Store a sub-invocation ID that runs inside a parent workflow.
- Parameters:
parent_workflow_id – The workflow ID that contains the sub-invocation
sub_invocation_id – The invocation ID of the task/sub-workflow running inside
- get_workflow_sub_invocations(workflow_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve all sub-invocation IDs that run inside a specific workflow.
- Parameters:
workflow_id – The workflow ID to get sub-invocations for
- Returns:
Iterator of invocation IDs that run inside the workflow
- iter_invocations_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.identifiers.invocation_id.InvocationId]][source]¶
Iterate over invocation IDs that have history within time range.
- iter_history_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.state_backend.base_state_backend.InvocationHistory]][source]¶
Iterate over history entries within time range.
- _store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]¶
Store a runner context.
- Parameters:
runner_id (str) – The runner’s unique identifier
runner_context (RunnerContext) – The context to store
- _get_runner_context(runner_id: str) RunnerContext | None[source]¶
Retrieve a runner context by runner_id.
- Parameters:
runner_id (str) – The runner’s unique identifier
- Returns:
The stored RunnerContext or None if not found
- _get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]¶
Retrieve multiple runner contexts by their IDs.
- get_matching_runner_contexts(partial_id: str) collections.abc.Iterator[pynenc.runner.runner_context.RunnerContext][source]¶
Search runner contexts by partial ID match.