pynenc.state_backend.sqlite_state_backend

SQLite-based state backend for cross-process testing.

This module provides a SQLite-based state backend implementation that enables true cross-process coordination for testing process runners. Unlike shared memory, SQLite provides ACID transactions and handles concurrent access automatically.

Module Contents

Classes

Tables

SQLiteStateBackend

A SQLite-based implementation of the state backend for cross-process testing.

Functions

init_tables

Initialize SQLite tables for state backend.

API

class pynenc.state_backend.sqlite_state_backend.Tables[source]
RESULTS

‘state_backend_results’

EXCEPTIONS

‘state_backend_exceptions’

INVOCATIONS

‘state_backend_invocations’

RUNNER_CONTEXTS

‘state_backend_runner_contexts’

HISTORY

‘state_backend_history’

WORKFLOWS

‘state_backend_workflows’

APP_INFO

‘state_backend_app_info’

WORKFLOW_DATA

‘state_backend_workflow_data’

WORKFLOW_SUB_INVOCATIONS

‘state_backend_workflow_sub_invocations’

pynenc.state_backend.sqlite_state_backend.init_tables(sqlite_db_path: str) None[source]

Initialize SQLite tables for state backend.

class pynenc.state_backend.sqlite_state_backend.SQLiteStateBackend(app: pynenc.app.Pynenc)[source]

Bases: pynenc.state_backend.base_state_backend.BaseStateBackend[pynenc.types.Params, pynenc.types.Result]

A SQLite-based implementation of the state backend for cross-process testing.

Stores invocation data, history, results, and exceptions in SQLite database which allows state sharing between processes and is suitable for testing process runners.

Warning

The SQLiteStateBackend class is designed for testing purposes only and should not be used in production systems. It uses temporary SQLite files for state.

Initialization

conf() pynenc.conf.config_state_backend.ConfigStateBackendSQLite
store_app_info(app_info: pynenc.app_info.AppInfo) None[source]

Store app info

get_app_info() pynenc.app.AppInfo[source]

Retrieve app info for the current app

static discover_app_infos() dict[str, pynenc.app_info.AppInfo][source]

Retrieve all app information registered in this state backend.

store_workflow_run(workflow_identity: pynenc.workflow.workflow_identity.WorkflowIdentity) None[source]

Store a workflow run for tracking and monitoring.

_upsert_invocations(entries: list[tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO]]) None[source]

Store invocation and call DTO pairs as discrete columns.

_get_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO] | None[source]

Retrieve invocation and call DTOs by invocation ID.

get_child_invocations(parent_invocation_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Return IDs of invocations that name the given ID as their parent.

Parameters:

parent_invocation_id – The parent invocation ID to search for.

Returns:

Iterator of child invocation IDs.

_add_histories(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]

Adds the same history record for a list of invocations.

Stores history entries with explicit timestamp and status so that multiple status updates per invocation can be preserved and uniquely identified.

_get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]

Retrieves the history of an invocation ordered by timestamp.

_get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]

Retrieves the result of an invocation by ID.

_set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_result: str) None[source]

Sets the result of an invocation by ID.

_get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]

Retrieves the exception of an invocation by ID.

Parameters:

invocation_id (InvocationId) – The ID of the invocation

Returns:

The serialized exception string

_set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_exception: str) None[source]

Sets the raised exception by invocation ID.

set_workflow_data(workflow_identity: pynenc.workflow.workflow_identity.WorkflowIdentity, key: str, value: Any) None[source]

Set workflow data.

get_workflow_data(workflow_identity: pynenc.workflow.workflow_identity.WorkflowIdentity, key: str, default: Any = None) Any[source]

Get workflow data.

get_all_workflow_types() collections.abc.Iterator[pynenc.identifiers.task_id.TaskId][source]

Retrieve all workflow IDs.

get_all_workflow_runs() collections.abc.Iterator[pynenc.workflow.workflow_identity.WorkflowIdentity][source]

Retrieve all stored workflows.

get_workflow_runs(workflow_type: pynenc.identifiers.task_id.TaskId) collections.abc.Iterator[pynenc.workflow.workflow_identity.WorkflowIdentity][source]
_get_workflow_runs(workflow_type_key: str | None) collections.abc.Iterator[pynenc.workflow.workflow_identity.WorkflowIdentity][source]

Retrieve workflow runs for a specific task.

store_workflow_sub_invocation(parent_workflow_id: str, sub_invocation_id: str) None[source]

Store workflow sub-invocation relationship.

get_workflow_sub_invocations(workflow_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Get workflow sub-invocations.

iter_invocations_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.identifiers.invocation_id.InvocationId]][source]

Iterate over invocation IDs that have history within time range.

iter_history_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.state_backend.base_state_backend.InvocationHistory]][source]

Iterate over history entries within time range.

_store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]

Store a runner context.

Parameters:
  • runner_id (str) – The runner’s unique identifier

  • runner_context (RunnerContext) – The context to store

_parse_runner_context_row(row: tuple) pynenc.runner.runner_context.RunnerContext[source]
_get_runner_context(runner_id: str) RunnerContext | None[source]

Retrieve a runner context by runner_id from SQLite.

Parameters:

runner_id (str) – The runner’s unique identifier

Returns:

The stored RunnerContext or None if not found

_get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]

Retrieve multiple runner contexts by their IDs.

Parameters:

runner_ids (list[str]) – List of runner unique identifiers

Returns:

list[“RunnerContext”] of the stored RunnerContexts

get_matching_runner_contexts(partial_id: str) collections.abc.Iterator[pynenc.runner.runner_context.RunnerContext][source]

Search runner contexts by partial ID match.

get_invocation_ids_by_workflow(workflow_id: str | None = None, workflow_type_key: str | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs filtered by workflow criteria.

purge() None[source]

Clear all state backend data