pynenc.state_backend.sqlite_state_backend¶
SQLite-based state backend for cross-process testing.
This module provides a SQLite-based state backend implementation that enables true cross-process coordination for testing process runners. Unlike shared memory, SQLite provides ACID transactions and handles concurrent access automatically.
Module Contents¶
Classes¶
Table names for state backend, scoped by app_id. |
|
A SQLite-based implementation of the state backend for cross-process testing. |
Functions¶
Initialize SQLite tables for state backend. |
API¶
- class pynenc.state_backend.sqlite_state_backend.Tables(app_id: str)[source]¶
Bases:
pynenc.util.sqlite_utils.TableNamesTable names for state backend, scoped by app_id.
Initialization
- pynenc.state_backend.sqlite_state_backend.init_tables(sqlite_db_path: str, tables: pynenc.state_backend.sqlite_state_backend.Tables) None[source]¶
Initialize SQLite tables for state backend.
- class pynenc.state_backend.sqlite_state_backend.SQLiteStateBackend(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.state_backend.base_state_backend.BaseStateBackend[pynenc.types.Params,pynenc.types.Result]A SQLite-based implementation of the state backend for cross-process testing.
Stores invocation data, history, results, and exceptions in SQLite database which allows state sharing between processes and is suitable for testing process runners.
Warning
The
SQLiteStateBackendclass is designed for testing purposes only and should not be used in production systems. It uses temporary SQLite files for state.Initialization
- store_app_info(app_info: pynenc.app_info.AppInfo) None[source]¶
Store app info
- static discover_app_infos() dict[str, pynenc.app_info.AppInfo][source]¶
Retrieve all app information registered in this state backend.
- store_workflow_run(workflow_identity: pynenc.workflow.workflow_identity.WorkflowIdentity) None[source]¶
Store a workflow run for tracking and monitoring.
- _upsert_invocations(entries: list[tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO]]) None[source]¶
Store invocation and call DTO pairs as discrete columns.
- _get_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO] | None[source]¶
Retrieve invocation and call DTOs by invocation ID.
- get_child_invocations(parent_invocation_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Return IDs of invocations that name the given ID as their parent.
- Parameters:
parent_invocation_id – The parent invocation ID to search for.
- Returns:
Iterator of child invocation IDs.
- _add_histories(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]¶
Adds the same history record for a list of invocations.
Stores history entries with explicit timestamp and status so that multiple status updates per invocation can be preserved and uniquely identified.
- _get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]¶
Retrieves the history of an invocation ordered by timestamp.
- _get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]¶
Retrieves the result of an invocation by ID.
- _set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_result: str) None[source]¶
Sets the result of an invocation by ID.
- _get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]¶
Retrieves the exception of an invocation by ID.
- Parameters:
invocation_id (InvocationId) – The ID of the invocation
- Returns:
The serialized exception string
- _set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_exception: str) None[source]¶
Sets the raised exception by invocation ID.
- set_workflow_data(workflow_identity: pynenc.workflow.workflow_identity.WorkflowIdentity, key: str, value: Any) None[source]¶
Set workflow data.
- get_workflow_data(workflow_identity: pynenc.workflow.workflow_identity.WorkflowIdentity, key: str, default: Any = None) Any[source]¶
Get workflow data.
- get_all_workflow_types() collections.abc.Iterator[pynenc.identifiers.task_id.TaskId][source]¶
Retrieve all workflow IDs.
- get_all_workflow_runs() collections.abc.Iterator[pynenc.workflow.workflow_identity.WorkflowIdentity][source]¶
Retrieve all stored workflows.
- get_workflow_runs(workflow_type: pynenc.identifiers.task_id.TaskId) collections.abc.Iterator[pynenc.workflow.workflow_identity.WorkflowIdentity][source]¶
- _get_workflow_runs(workflow_type_key: str | None) collections.abc.Iterator[pynenc.workflow.workflow_identity.WorkflowIdentity][source]¶
Retrieve workflow runs for a specific task.
- store_workflow_sub_invocation(parent_workflow_id: str, sub_invocation_id: str) None[source]¶
Store workflow sub-invocation relationship.
- get_workflow_sub_invocations(workflow_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Get workflow sub-invocations.
- iter_invocations_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.identifiers.invocation_id.InvocationId]][source]¶
Iterate over invocation IDs that have history within time range.
- iter_history_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.state_backend.base_state_backend.InvocationHistory]][source]¶
Iterate over history entries within time range.
- _store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]¶
Store a runner context.
- Parameters:
runner_id (str) – The runner’s unique identifier
runner_context (RunnerContext) – The context to store
- _parse_runner_context_row(row: tuple) pynenc.runner.runner_context.RunnerContext[source]¶
- _get_runner_context(runner_id: str) RunnerContext | None[source]¶
Retrieve a runner context by runner_id from SQLite.
- Parameters:
runner_id (str) – The runner’s unique identifier
- Returns:
The stored RunnerContext or None if not found
- _get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]¶
Retrieve multiple runner contexts by their IDs.
- get_matching_runner_contexts(partial_id: str) collections.abc.Iterator[pynenc.runner.runner_context.RunnerContext][source]¶
Search runner contexts by partial ID match.