pynenc.state_backend.base_state_backend

Module Contents

Classes

InvocationHistory

Data structure representing the history of a task invocation.

BaseStateBackend

Abstract base class for state backends in a distributed task system.

API

class pynenc.state_backend.base_state_backend.InvocationHistory[source]

Data structure representing the history of a task invocation.

Stores the invocation ID, timestamp, status, and execution context. Provides methods for serialization and deserialization to and from JSON.

Variables:
  • invocation_id (str) – The ID of the invocation this history belongs to

  • _timestamp – Timestamp of the invocation history creation.

  • status_record (InvocationStatusRecord) – Current status of the invocation.

  • owner_context (RunnerContext) – Context of the execution.

invocation_id: str

None

_timestamp: datetime.datetime

‘field(…)’

status_record: pynenc.invocation.status.InvocationStatusRecord

None

runner_context_id: str

None

registered_by_inv_id: str | None

None

property timestamp: datetime.datetime

Returns the timestamp of the invocation history.

to_json() str[source]

Serializes the invocation history to a JSON string.

Returns:

A JSON representation of the invocation history.

classmethod from_json(json_str: str) pynenc.state_backend.base_state_backend.InvocationHistory[source]

Deserializes a JSON string into an InvocationHistory object.

Parameters:

json_str (str) – JSON string to deserialize.

Returns:

An instance of InvocationHistory.

class pynenc.state_backend.base_state_backend.BaseStateBackend(app: pynenc.app.Pynenc)[source]

Bases: abc.ABC, typing.Generic[pynenc.types.Params, pynenc.types.Result]

Abstract base class for state backends in a distributed task system.

Manages storage and retrieval of invocation-related data, including execution status, history, results, and exceptions.

Initialization

conf() pynenc.conf.config_state_backend.ConfigStateBackend
wait_for_all_async_operations() None[source]

Waits for all asynchronous operations related to invocation status to complete.

wait_for_invocation_async_operations(invocation_id: str) None[source]

Waits for all asynchronous operations for a specific invocation to complete.

Parameters:

invocation_id (str) – ID of the invocation.

abstractmethod purge() None[source]

Purges all store state backend data for the current application

abstractmethod _get_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO] | None[source]

Retrieve invocation and call DTOs by invocation ID.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to retrieve.

Returns:

Paired DTOs if found, else None.

abstractmethod _add_histories(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]

Adds a history record for a list of invocations.

Parameters:
  • invocation_ids (list[“InvocationId”]) – The IDs of the invocations.

  • invocation_history (InvocationHistory) – The history record to add.

abstractmethod _get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]

Retrieves the history of an invocation ordered by timestamp.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to get the history from

Returns:

List of InvocationHistory records

abstractmethod _get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]

Retrieves the result of an invocation.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to get the result from

Returns:

The serialized result string

abstractmethod _set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_result: str) None[source]

Sets the result of an invocation.

Parameters:
  • invocation_id (“InvocationId”) – The ID of the invocation to set

  • serialized_result (str) – The serialized result to set

serialize_exception(exception: Exception) str[source]

Serializes an exception into a string representation.

This method provides a default implementation for serializing exceptions using the serialize method. It can be overridden by subclasses if specific handling for exceptions is required.

Parameters:

exception (Exception) – The exception to be serialized.

Returns:

A string representation of the serialized exception.

deserialize_exception(serialized: str) Exception[source]

Deserializes a string representation of an exception back into an Exception object.

This method provides a default implementation for deserializing exceptions using the deserialize method. It can be overridden by subclasses if specific handling for exceptions is required.

Parameters:

serialized (str) – The string representation of the serialized exception.

Returns:

The deserialized Exception object.

abstractmethod _get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]

Retrieves the serialized exception of an invocation.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to get the exception from

Returns:

The serialized exception string

abstractmethod _set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_exception: str) None[source]

Sets the raised exception by an invocation ran.

Parameters:
  • invocation_id (“InvocationId”) – The ID of the invocation to set

  • exception (“Exception”) – The exception raised

upsert_invocations(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation]) None[source]

Persist invocations by converting to DTOs and storing via backend.

Converts each invocation into an (InvocationDTO, CallDTO) pair and passes them to the abstract _upsert_invocations. Argument serialization and external-storage decisions are handled transparently by Call.serialized_arguments (which delegates to ClientDataStore.serialize_arguments).

Parameters:

invocations (list[DistributedInvocation]) – The invocations to upsert.

abstractmethod _upsert_invocations(entries: list[tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO]]) None[source]

Backend-specific storage of invocation and call DTOs.

Each entry is a paired (InvocationDTO, CallDTO) ready for persistence. Backends can store them together or in separate tables/collections.

Parameters:

entries (list[tuple[InvocationDTO, CallDTO]]) – Paired DTOs to persist.

get_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.dist_invocation.DistributedInvocation[source]

Retrieve an invocation by ID, reconstructing from DTOs.

Loads the InvocationDTO and CallDTO from the backend, constructs a LazyCall (arguments deserialized only when accessed), and reassembles the DistributedInvocation via from_dto.

Parameters:

invocation_id (“InvocationId”) – ID of the invocation.

Returns:

The retrieved invocation with a LazyCall.

Raises:

InvocationNotFoundError – If the invocation wasn’t found.

abstractmethod get_child_invocations(parent_invocation_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Return IDs of all invocations directly spawned by the given parent.

Used for family-tree traversal: given a parent invocation ID, find all invocations that recorded it as their parent_invocation_id.

Parameters:

parent_invocation_id – The invocation ID to find children for.

Returns:

List of child invocation IDs (may be empty).

add_histories(invocations: list[DistributedInvocation[Params, Result]], status_record: pynenc.invocation.status.InvocationStatusRecord, runner_context: pynenc.runner.runner_context.RunnerContext) None[source]

Adds a history record for invocations.

Parameters:
add_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId, status_record: pynenc.invocation.status.InvocationStatusRecord, runner_context: pynenc.runner.runner_context.RunnerContext) None[source]

Adds a history record for a single invocation.

Parameters:
  • invocation_id (“InvocationId”) – The ID of the invocation.

  • status_record (InvocationStatusRecord) – The status record.

  • runner_context (RunnerContext) – The runner context.

get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]

Retrieves the history of an invocation.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to retrieve history for.

Returns:

A list of invocation history records.

set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, result: pynenc.types.Result) None[source]

Sets the result of an invocation.

Parameters:
  • invocation_id (“InvocationId”) – The ID of the invocation to set the result for.

  • result (Result) – The result of the invocation.

get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.types.Result[source]

Retrieves the result of an invocation.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to retrieve the result for.

Returns:

The result of the invocation.

set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, exception: Exception) None[source]

Sets the exception of an invocation.

Parameters:
  • invocation_id (“InvocationId”) – The ID of the invocation to set the exception for.

  • exception (Exception) – The exception of the invocation.

get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) Exception[source]

Retrieves the exception of an invocation.

Parameters:

invocation_id (“InvocationId”) – The ID of the invocation to retrieve the exception for.

Returns:

The exception of the invocation.

abstractmethod set_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]

Set a value in workflow data.

Parameters:
  • workflow_identity (“WorkflowIdentity”) – Workflow identity

  • key (str) – Data key to set

  • value (Any) – Value to store

abstractmethod get_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, default: Any = None) Any[source]

Get a value from workflow data.

Parameters:
  • workflow_identity (“WorkflowIdentity”) – Workflow identity

  • key (str) – Data key to retrieve

  • default (Any) – Default value if key doesn’t exist

Returns:

Stored value or default

abstractmethod store_app_info(app_info: pynenc.app.AppInfo) None[source]

Register this app’s information in the state backend for discovery.

Parameters:

app_info (“AppInfo”) – The app information to store

Returns:

None

abstractmethod get_app_info() pynenc.app.AppInfo[source]

Retrieve information of the current app.

Returns:

The app information

abstractmethod static discover_app_infos() dict[str, pynenc.app.AppInfo][source]

This static method tries to discover all registered apps in the system. We will try to connect to the backend with the default configuration.

Returns:

Dictionary mapping app_id to app information

abstractmethod store_workflow_run(workflow_identity: pynenc.workflow.WorkflowIdentity) None[source]

Store a workflow run for tracking and monitoring.

Maintains workflow type registry and specific workflow run instances. This enables monitoring of workflow types and their execution history.

Parameters:

workflow_identity – The workflow identity to store

abstractmethod get_all_workflow_types() collections.abc.Iterator[pynenc.identifiers.task_id.TaskId][source]

Retrieve all workflow types (workflow_task_ids) stored in this state backend.

Returns:

Iterator of workflow task IDs representing different workflow types

abstractmethod get_all_workflow_runs() collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]

Retrieve workflow run identities from this state backend.

Returns:

Iterator of workflow identities for runs

abstractmethod get_workflow_runs(workflow_type: pynenc.identifiers.task_id.TaskId) collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]

Retrieve workflow run identities from this state backend.

Parameters:

workflow_type – Filter for specific workflow type

Returns:

Iterator of workflow identities for runs

abstractmethod store_workflow_sub_invocation(parent_workflow_id: pynenc.identifiers.invocation_id.InvocationId, sub_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Store a sub-invocation ID that runs inside a parent workflow.

This tracks which invocations (tasks or sub-workflows) are executed within the context of a parent workflow for monitoring and debugging.

Parameters:
  • parent_workflow_id (“InvocationId”) – The workflow ID that contains the sub-invocation

  • sub_invocation_id – The invocation ID of the task/sub-workflow running inside

abstractmethod get_workflow_sub_invocations(workflow_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve all sub-invocation IDs that run inside a specific workflow.

Parameters:

workflow_id – The workflow ID to get sub-invocations for

Returns:

Iterator of invocation IDs that run inside the workflow

abstractmethod iter_invocations_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.identifiers.invocation_id.InvocationId]][source]

Iterate over invocation IDs that have history within a time range.

Parameters:
  • start_time (datetime) – Start of the time range

  • end_time (datetime) – End of the time range

  • batch_size (int) – Number of invocation IDs per batch

Returns:

Iterator yielding batches of invocation IDs

abstractmethod iter_history_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.state_backend.base_state_backend.InvocationHistory]][source]

Iterate over history entries within a time range.

Parameters:
  • start_time (datetime) – Start of the time range

  • end_time (datetime) – End of the time range

  • batch_size (int) – Number of history entries per batch

Returns:

Iterator yielding batches of InvocationHistory

abstractmethod _store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]

Backend-specific implementation to store a runner context.

Parameters:

runner_context (RunnerContext) – The context to store

abstractmethod get_matching_runner_contexts(partial_id: str) collections.abc.Iterator[pynenc.runner.runner_context.RunnerContext][source]

Search for runner contexts whose runner_id contains the partial string.

Used by the log explorer to find runners from truncated log output.

Parameters:

partial_id (str) – Partial runner ID to match (prefix or substring)

Returns:

Iterator of matching RunnerContext instances

abstractmethod get_invocation_ids_by_workflow(workflow_id: str | None = None, workflow_type_key: str | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs filtered by workflow criteria.

Parameters:
  • workflow_id (str | None) – Exact workflow ID to match

  • workflow_type_key (str | None) – Workflow type key to match

Returns:

Iterator of matching invocation IDs

store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]

Store a runner context.

Updates the local cache and delegates to backend implementation.

Parameters:

runner_context (RunnerContext) – The context to store

abstractmethod _get_runner_context(runner_id: str) RunnerContext | None[source]

Backend-specific implementation to retrieve a single runner context.

Parameters:

runner_id (str) – The runner’s unique identifier

Returns:

The stored RunnerContext or None if not found

abstractmethod _get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]

Backend-specific implementation to retrieve multiple runner contexts.

Only called for IDs not found in local cache.

Parameters:

runner_ids (list[str]) – List of runner unique identifiers

Returns:

list[“RunnerContext”] of the stored RunnerContexts

get_runner_context(runner_id: str) RunnerContext | None[source]

Retrieve a runner context by runner_id.

Checks local cache first, then backend if not found.

Parameters:

runner_id (str) – The runner’s unique identifier

Returns:

The stored RunnerContext or None if not found

get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]

Retrieve multiple runner contexts by their IDs.

Uses local cache for known contexts, fetches missing ones from backend, and updates cache with newly fetched contexts.

Parameters:

runner_ids (list[str]) – List of runner unique identifiers

Returns:

list[“RunnerContext”] of the stored RunnerContexts