pynenc.state_backend.base_state_backend¶
Module Contents¶
Classes¶
Data structure representing the history of a task invocation. |
|
Abstract base class for state backends in a distributed task system. |
API¶
- class pynenc.state_backend.base_state_backend.InvocationHistory[source]¶
Data structure representing the history of a task invocation.
Stores the invocation ID, timestamp, status, and execution context. Provides methods for serialization and deserialization to and from JSON.
- Variables:
invocation_id (str) – The ID of the invocation this history belongs to
_timestamp – Timestamp of the invocation history creation.
status_record (InvocationStatusRecord) – Current status of the invocation.
owner_context (RunnerContext) – Context of the execution.
- _timestamp: datetime.datetime¶
‘field(…)’
- status_record: pynenc.invocation.status.InvocationStatusRecord¶
None
- property timestamp: datetime.datetime¶
Returns the timestamp of the invocation history.
- to_json() str[source]¶
Serializes the invocation history to a JSON string.
- Returns:
A JSON representation of the invocation history.
- classmethod from_json(json_str: str) pynenc.state_backend.base_state_backend.InvocationHistory[source]¶
Deserializes a JSON string into an InvocationHistory object.
- Parameters:
json_str (str) – JSON string to deserialize.
- Returns:
An instance of InvocationHistory.
- class pynenc.state_backend.base_state_backend.BaseStateBackend(app: pynenc.app.Pynenc)[source]¶
Bases:
abc.ABC,typing.Generic[pynenc.types.Params,pynenc.types.Result]Abstract base class for state backends in a distributed task system.
Manages storage and retrieval of invocation-related data, including execution status, history, results, and exceptions.
Initialization
- wait_for_all_async_operations() None[source]¶
Waits for all asynchronous operations related to invocation status to complete.
- wait_for_invocation_async_operations(invocation_id: str) None[source]¶
Waits for all asynchronous operations for a specific invocation to complete.
- Parameters:
invocation_id (str) – ID of the invocation.
- abstractmethod purge() None[source]¶
Purges all store state backend data for the current application
- abstractmethod _get_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO] | None[source]¶
Retrieve invocation and call DTOs by invocation ID.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to retrieve.
- Returns:
Paired DTOs if found, else None.
- abstractmethod _add_histories(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]¶
Adds a history record for a list of invocations.
- Parameters:
invocation_ids (list[“InvocationId”]) – The IDs of the invocations.
invocation_history (InvocationHistory) – The history record to add.
- abstractmethod _get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]¶
Retrieves the history of an invocation ordered by timestamp.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to get the history from
- Returns:
List of InvocationHistory records
- abstractmethod _get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]¶
Retrieves the result of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to get the result from
- Returns:
The serialized result string
- abstractmethod _set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_result: str) None[source]¶
Sets the result of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to set
serialized_result (str) – The serialized result to set
- serialize_exception(exception: Exception) str[source]¶
Serializes an exception into a string representation.
This method provides a default implementation for serializing exceptions using the
serializemethod. It can be overridden by subclasses if specific handling for exceptions is required.- Parameters:
exception (Exception) – The exception to be serialized.
- Returns:
A string representation of the serialized exception.
- deserialize_exception(serialized: str) Exception[source]¶
Deserializes a string representation of an exception back into an Exception object.
This method provides a default implementation for deserializing exceptions using the
deserializemethod. It can be overridden by subclasses if specific handling for exceptions is required.- Parameters:
serialized (str) – The string representation of the serialized exception.
- Returns:
The deserialized Exception object.
- abstractmethod _get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]¶
Retrieves the serialized exception of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to get the exception from
- Returns:
The serialized exception string
- abstractmethod _set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_exception: str) None[source]¶
Sets the raised exception by an invocation ran.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to set
exception (“Exception”) – The exception raised
- upsert_invocations(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation]) None[source]¶
Persist invocations by converting to DTOs and storing via backend.
Converts each invocation into an
(InvocationDTO, CallDTO)pair and passes them to the abstract_upsert_invocations. Argument serialization and external-storage decisions are handled transparently byCall.serialized_arguments(which delegates toClientDataStore.serialize_arguments).- Parameters:
invocations (list[DistributedInvocation]) – The invocations to upsert.
- abstractmethod _upsert_invocations(entries: list[tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO]]) None[source]¶
Backend-specific storage of invocation and call DTOs.
Each entry is a paired
(InvocationDTO, CallDTO)ready for persistence. Backends can store them together or in separate tables/collections.- Parameters:
entries (list[tuple[InvocationDTO, CallDTO]]) – Paired DTOs to persist.
- get_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.dist_invocation.DistributedInvocation[source]¶
Retrieve an invocation by ID, reconstructing from DTOs.
Loads the
InvocationDTOandCallDTOfrom the backend, constructs aLazyCall(arguments deserialized only when accessed), and reassembles theDistributedInvocationviafrom_dto.- Parameters:
invocation_id (“InvocationId”) – ID of the invocation.
- Returns:
The retrieved invocation with a LazyCall.
- Raises:
InvocationNotFoundError – If the invocation wasn’t found.
- abstractmethod get_child_invocations(parent_invocation_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Return IDs of all invocations directly spawned by the given parent.
Used for family-tree traversal: given a parent invocation ID, find all invocations that recorded it as their
parent_invocation_id.- Parameters:
parent_invocation_id – The invocation ID to find children for.
- Returns:
List of child invocation IDs (may be empty).
- add_histories(invocations: list[DistributedInvocation[Params, Result]], status_record: pynenc.invocation.status.InvocationStatusRecord, runner_context: pynenc.runner.runner_context.RunnerContext) None[source]¶
Adds a history record for invocations.
- Parameters:
invocations (list[DistributedInvocation]) – The invocations to add history for.
status_record (InvocationStatusRecord) – The status record of the invocation.
owner_context (RunnerContext) – The owner context of the execution.
- add_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId, status_record: pynenc.invocation.status.InvocationStatusRecord, runner_context: pynenc.runner.runner_context.RunnerContext) None[source]¶
Adds a history record for a single invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation.
status_record (InvocationStatusRecord) – The status record.
runner_context (RunnerContext) – The runner context.
- get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]¶
Retrieves the history of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to retrieve history for.
- Returns:
A list of invocation history records.
- set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, result: pynenc.types.Result) None[source]¶
Sets the result of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to set the result for.
result (Result) – The result of the invocation.
- get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.types.Result[source]¶
Retrieves the result of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to retrieve the result for.
- Returns:
The result of the invocation.
- set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, exception: Exception) None[source]¶
Sets the exception of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to set the exception for.
exception (Exception) – The exception of the invocation.
- get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) Exception[source]¶
Retrieves the exception of an invocation.
- Parameters:
invocation_id (“InvocationId”) – The ID of the invocation to retrieve the exception for.
- Returns:
The exception of the invocation.
- abstractmethod set_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]¶
Set a value in workflow data.
- Parameters:
workflow_identity (“WorkflowIdentity”) – Workflow identity
key (str) – Data key to set
value (Any) – Value to store
- abstractmethod get_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, default: Any = None) Any[source]¶
Get a value from workflow data.
- Parameters:
workflow_identity (“WorkflowIdentity”) – Workflow identity
key (str) – Data key to retrieve
default (Any) – Default value if key doesn’t exist
- Returns:
Stored value or default
- abstractmethod store_app_info(app_info: pynenc.app.AppInfo) None[source]¶
Register this app’s information in the state backend for discovery.
- Parameters:
app_info (“AppInfo”) – The app information to store
- Returns:
None
- abstractmethod get_app_info() pynenc.app.AppInfo[source]¶
Retrieve information of the current app.
- Returns:
The app information
- abstractmethod static discover_app_infos() dict[str, pynenc.app.AppInfo][source]¶
This static method tries to discover all registered apps in the system. We will try to connect to the backend with the default configuration.
- Returns:
Dictionary mapping app_id to app information
- abstractmethod store_workflow_run(workflow_identity: pynenc.workflow.WorkflowIdentity) None[source]¶
Store a workflow run for tracking and monitoring.
Maintains workflow type registry and specific workflow run instances. This enables monitoring of workflow types and their execution history.
- Parameters:
workflow_identity – The workflow identity to store
- abstractmethod get_all_workflow_types() collections.abc.Iterator[pynenc.identifiers.task_id.TaskId][source]¶
Retrieve all workflow types (workflow_task_ids) stored in this state backend.
- Returns:
Iterator of workflow task IDs representing different workflow types
- abstractmethod get_all_workflow_runs() collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]¶
Retrieve workflow run identities from this state backend.
- Returns:
Iterator of workflow identities for runs
- abstractmethod get_workflow_runs(workflow_type: pynenc.identifiers.task_id.TaskId) collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]¶
Retrieve workflow run identities from this state backend.
- Parameters:
workflow_type – Filter for specific workflow type
- Returns:
Iterator of workflow identities for runs
- abstractmethod store_workflow_sub_invocation(parent_workflow_id: pynenc.identifiers.invocation_id.InvocationId, sub_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Store a sub-invocation ID that runs inside a parent workflow.
This tracks which invocations (tasks or sub-workflows) are executed within the context of a parent workflow for monitoring and debugging.
- Parameters:
parent_workflow_id (“InvocationId”) – The workflow ID that contains the sub-invocation
sub_invocation_id – The invocation ID of the task/sub-workflow running inside
- abstractmethod get_workflow_sub_invocations(workflow_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve all sub-invocation IDs that run inside a specific workflow.
- Parameters:
workflow_id – The workflow ID to get sub-invocations for
- Returns:
Iterator of invocation IDs that run inside the workflow
- abstractmethod iter_invocations_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.identifiers.invocation_id.InvocationId]][source]¶
Iterate over invocation IDs that have history within a time range.
- Parameters:
start_time (datetime) – Start of the time range
end_time (datetime) – End of the time range
batch_size (int) – Number of invocation IDs per batch
- Returns:
Iterator yielding batches of invocation IDs
- abstractmethod iter_history_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.state_backend.base_state_backend.InvocationHistory]][source]¶
Iterate over history entries within a time range.
- Parameters:
start_time (datetime) – Start of the time range
end_time (datetime) – End of the time range
batch_size (int) – Number of history entries per batch
- Returns:
Iterator yielding batches of InvocationHistory
- abstractmethod _store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]¶
Backend-specific implementation to store a runner context.
- Parameters:
runner_context (RunnerContext) – The context to store
- abstractmethod get_matching_runner_contexts(partial_id: str) collections.abc.Iterator[pynenc.runner.runner_context.RunnerContext][source]¶
Search for runner contexts whose runner_id contains the partial string.
Used by the log explorer to find runners from truncated log output.
- Parameters:
partial_id (str) – Partial runner ID to match (prefix or substring)
- Returns:
Iterator of matching RunnerContext instances
- abstractmethod get_invocation_ids_by_workflow(workflow_id: str | None = None, workflow_type_key: str | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs filtered by workflow criteria.
- store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]¶
Store a runner context.
Updates the local cache and delegates to backend implementation.
- Parameters:
runner_context (RunnerContext) – The context to store
- abstractmethod _get_runner_context(runner_id: str) RunnerContext | None[source]¶
Backend-specific implementation to retrieve a single runner context.
- Parameters:
runner_id (str) – The runner’s unique identifier
- Returns:
The stored RunnerContext or None if not found
- abstractmethod _get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]¶
Backend-specific implementation to retrieve multiple runner contexts.
Only called for IDs not found in local cache.
- get_runner_context(runner_id: str) RunnerContext | None[source]¶
Retrieve a runner context by runner_id.
Checks local cache first, then backend if not found.
- Parameters:
runner_id (str) – The runner’s unique identifier
- Returns:
The stored RunnerContext or None if not found