Source code for pynenc.state_backend.base_state_backend

import json
from abc import ABC, abstractmethod
from collections.abc import Iterator
from dataclasses import dataclass, field
from datetime import UTC, datetime
from functools import cached_property
from typing import TYPE_CHECKING, Any, Generic

from pynenc.conf.config_state_backend import ConfigStateBackend
from pynenc.exceptions import InvocationNotFoundError, PynencError
from pynenc.invocation.dist_invocation import DistributedInvocation
from pynenc.invocation.status import InvocationStatus, InvocationStatusRecord
from pynenc.types import Params, Result

if TYPE_CHECKING:
    from pynenc.app import AppInfo, Pynenc
    from pynenc.call import Call
    from pynenc.identifiers.invocation_id import InvocationId
    from pynenc.invocation.dist_invocation import InvocationDTO
    from pynenc.models.call_dto import CallDTO
    from pynenc.runner.runner_context import RunnerContext
    from pynenc.identifiers.task_id import TaskId
    from pynenc.workflow import WorkflowIdentity


[docs] @dataclass class InvocationHistory: """ Data structure representing the history of a task invocation. Stores the invocation ID, timestamp, status, and execution context. Provides methods for serialization and deserialization to and from JSON. :ivar str invocation_id: The ID of the invocation this history belongs to :ivar _timestamp: Timestamp of the invocation history creation. :ivar InvocationStatusRecord status_record: Current status of the invocation. :ivar RunnerContext owner_context: Context of the execution. """ invocation_id: str _timestamp: datetime = field(init=False, default_factory=lambda: datetime.now(UTC)) status_record: InvocationStatusRecord runner_context_id: str registered_by_inv_id: str | None = None @property def timestamp(self) -> datetime: """Returns the timestamp of the invocation history.""" return self._timestamp
[docs] def to_json(self) -> str: """ Serializes the invocation history to a JSON string. :return: A JSON representation of the invocation history. """ return json.dumps( { "invocation_id": self.invocation_id, "_timestamp": self._timestamp.isoformat(), "status_record": self.status_record.to_json(), "runner_context_id": self.runner_context_id, "registered_by_inv_id": self.registered_by_inv_id, } )
[docs] @classmethod def from_json(cls, json_str: str) -> "InvocationHistory": """ Deserializes a JSON string into an InvocationHistory object. :param str json_str: JSON string to deserialize. :return: An instance of InvocationHistory. """ hist_dict = json.loads(json_str) history = cls( invocation_id=hist_dict["invocation_id"], status_record=InvocationStatusRecord.from_json(hist_dict["status_record"]), runner_context_id=hist_dict["runner_context_id"], registered_by_inv_id=hist_dict.get("registered_by_inv_id"), ) timestamp = datetime.fromisoformat(hist_dict["_timestamp"]) if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=UTC) history._timestamp = timestamp return history
[docs] class BaseStateBackend(ABC, Generic[Params, Result]): """ Abstract base class for state backends in a distributed task system. Manages storage and retrieval of invocation-related data, including execution status, history, results, and exceptions. """ def __init__(self, app: "Pynenc") -> None: self.app = app self._runner_context_cache: dict[str, RunnerContext] = {} @cached_property def conf(self) -> ConfigStateBackend: return ConfigStateBackend( config_values=self.app.config_values, config_filepath=self.app.config_filepath, )
[docs] @abstractmethod def purge(self) -> None: """Purges all store state backend data for the current application"""
[docs] @abstractmethod def _get_invocation( self, invocation_id: "InvocationId" ) -> tuple["InvocationDTO", "CallDTO"] | None: """Retrieve invocation and call DTOs by invocation ID. :param "InvocationId" invocation_id: The ID of the invocation to retrieve. :return: Paired DTOs if found, else None. """
[docs] @abstractmethod def _add_histories( self, invocation_ids: list["InvocationId"], invocation_history: "InvocationHistory", ) -> None: """ Adds a history record for a list of invocations. :param list["InvocationId"] invocation_ids: The IDs of the invocations. :param InvocationHistory invocation_history: The history record to add. """
[docs] @abstractmethod def _get_history(self, invocation_id: "InvocationId") -> list["InvocationHistory"]: """ Retrieves the history of an invocation ordered by timestamp. :param "InvocationId" invocation_id: The ID of the invocation to get the history from :return: List of InvocationHistory records """
[docs] @abstractmethod def _get_result(self, invocation_id: "InvocationId") -> str: """ Retrieves the result of an invocation. :param "InvocationId" invocation_id: The ID of the invocation to get the result from :return: The serialized result string """
[docs] @abstractmethod def _set_result( self, invocation_id: "InvocationId", serialized_result: str ) -> None: """ Sets the result of an invocation. :param "InvocationId" invocation_id: The ID of the invocation to set :param str serialized_result: The serialized result to set """
[docs] def serialize_exception(self, exception: Exception) -> str: """ Serializes an exception into a string representation. This method provides a default implementation for serializing exceptions using the `serialize` method. It can be overridden by subclasses if specific handling for exceptions is required. :param Exception exception: The exception to be serialized. :return: A string representation of the serialized exception. """ serialized_exception: dict[str, str | bool] = { "error_name": exception.__class__.__name__ } if isinstance(exception, PynencError): serialized_exception["pynenc_error"] = True serialized_exception["error_data"] = exception.to_json() else: serialized_exception["pynenc_error"] = False serialized_exception["error_data"] = self.app.client_data_store.serialize( exception ) return json.dumps(serialized_exception)
[docs] def deserialize_exception(self, serialized: str) -> Exception: """ Deserializes a string representation of an exception back into an Exception object. This method provides a default implementation for deserializing exceptions using the `deserialize` method. It can be overridden by subclasses if specific handling for exceptions is required. :param str serialized: The string representation of the serialized exception. :return: The deserialized Exception object. """ serialized_exception = json.loads(serialized) if serialized_exception["pynenc_error"]: return PynencError.from_json( serialized_exception["error_name"], serialized_exception["error_data"], ) return self.app.client_data_store.deserialize( serialized_exception["error_data"] )
[docs] @abstractmethod def _get_exception(self, invocation_id: "InvocationId") -> str: """ Retrieves the serialized exception of an invocation. :param "InvocationId" invocation_id: The ID of the invocation to get the exception from :return: The serialized exception string """
[docs] @abstractmethod def _set_exception( self, invocation_id: "InvocationId", serialized_exception: str ) -> None: """ Sets the raised exception by an invocation ran. :param "InvocationId" invocation_id: The ID of the invocation to set :param "Exception" exception: The exception raised """
[docs] def upsert_invocations(self, invocations: list["DistributedInvocation"]) -> None: """Persist invocations by converting to DTOs and storing via backend. Converts each invocation into an ``(InvocationDTO, CallDTO)`` pair and passes them to the abstract ``_upsert_invocations``. Argument serialization and external-storage decisions are handled transparently by ``Call.serialized_arguments`` (which delegates to ``ClientDataStore.serialize_arguments``). :param list[DistributedInvocation] invocations: The invocations to upsert. """ pairs: list[tuple[InvocationDTO, CallDTO]] = [] for invocation in invocations: inv_dto = invocation.to_dto() call_dto = invocation.call.to_dto() pairs.append((inv_dto, call_dto)) self._upsert_invocations(pairs)
[docs] @abstractmethod def _upsert_invocations( self, entries: list[tuple["InvocationDTO", "CallDTO"]] ) -> None: """Backend-specific storage of invocation and call DTOs. Each entry is a paired ``(InvocationDTO, CallDTO)`` ready for persistence. Backends can store them together or in separate tables/collections. :param list[tuple[InvocationDTO, CallDTO]] entries: Paired DTOs to persist. """
[docs] def get_invocation(self, invocation_id: "InvocationId") -> "DistributedInvocation": """Retrieve an invocation by ID, reconstructing from DTOs. Loads the ``InvocationDTO`` and ``CallDTO`` from the backend, constructs a ``LazyCall`` (arguments deserialized only when accessed), and reassembles the ``DistributedInvocation`` via ``from_dto``. :param "InvocationId" invocation_id: ID of the invocation. :return: The retrieved invocation with a LazyCall. :raises InvocationNotFoundError: If the invocation wasn't found. """ from pynenc.call import LazyCall from pynenc.invocation.dist_invocation import DistributedInvocation result = self._get_invocation(invocation_id) if result is None: raise InvocationNotFoundError(invocation_id, "The invocation wasn't stored") inv_dto, call_dto = result lazy_call: Call = LazyCall.from_dto(self.app, call_dto) return DistributedInvocation.from_dto(inv_dto, call=lazy_call)
[docs] @abstractmethod def get_child_invocations( self, parent_invocation_id: "InvocationId" ) -> Iterator["InvocationId"]: """Return IDs of all invocations directly spawned by the given parent. Used for family-tree traversal: given a parent invocation ID, find all invocations that recorded it as their ``parent_invocation_id``. :param parent_invocation_id: The invocation ID to find children for. :return: List of child invocation IDs (may be empty). """
[docs] @abstractmethod def get_invocations_by_parent_event( self, parent_event_id: str ) -> Iterator["InvocationId"]: """Return IDs of invocations created in response to ``parent_event_id``. :param parent_event_id: The event id whose triggered children are sought. :return: Iterator of invocation IDs (may be empty). """
[docs] def add_histories( self, invocations: list["DistributedInvocation[Params, Result]"], status_record: "InvocationStatusRecord", runner_context: "RunnerContext", ) -> None: """ Persists a history record for each invocation before returning. :param list[DistributedInvocation] invocations: The invocations to add history for. :param InvocationStatusRecord status_record: The status record of the invocation. :param RunnerContext owner_context: The owner context of the execution. """ self.store_runner_context(runner_context) for invocation in invocations: registered_by_inv_id = ( invocation.parent_invocation_id if status_record.status == InvocationStatus.REGISTERED else None ) invocation_history = InvocationHistory( invocation_id=invocation.invocation_id, status_record=status_record, runner_context_id=runner_context.runner_id, registered_by_inv_id=registered_by_inv_id, ) self._add_histories([invocation.invocation_id], invocation_history)
[docs] def add_history( self, invocation_id: "InvocationId", status_record: "InvocationStatusRecord", runner_context: "RunnerContext", ) -> None: """ Persists a history record for a single invocation before returning. :param "InvocationId" invocation_id: The ID of the invocation. :param InvocationStatusRecord status_record: The status record. :param RunnerContext runner_context: The runner context. """ self.store_runner_context(runner_context) invocation_history = InvocationHistory( invocation_id=invocation_id, status_record=status_record, runner_context_id=runner_context.runner_id, ) self._add_histories([invocation_id], invocation_history)
[docs] def get_history(self, invocation_id: "InvocationId") -> list[InvocationHistory]: """ Retrieves the history of an invocation. :param "InvocationId" invocation_id: The ID of the invocation to retrieve history for. :return: A list of invocation history records. """ return self._get_history(invocation_id)
[docs] def set_result(self, invocation_id: "InvocationId", result: Result) -> None: """ Sets the result of an invocation. :param "InvocationId" invocation_id: The ID of the invocation to set the result for. :param Result result: The result of the invocation. """ serialized_result = self.app.client_data_store.serialize(result) self._set_result(invocation_id, serialized_result)
[docs] def get_result(self, invocation_id: "InvocationId") -> Result: """ Retrieves the result of an invocation. :param "InvocationId" invocation_id: The ID of the invocation to retrieve the result for. :return: The result of the invocation. """ # insert result is block, no need for thread control serialized_result = self._get_result(invocation_id) return self.app.client_data_store.deserialize(serialized_result)
[docs] def set_exception( self, invocation_id: "InvocationId", exception: "Exception" ) -> None: """ Sets the exception of an invocation. :param "InvocationId" invocation_id: The ID of the invocation to set the exception for. :param Exception exception: The exception of the invocation. """ serialized_exception = self.serialize_exception(exception) self._set_exception(invocation_id, serialized_exception)
[docs] def get_exception(self, invocation_id: "InvocationId") -> Exception: """ Retrieves the exception of an invocation. :param "InvocationId" invocation_id: The ID of the invocation to retrieve the exception for. :return: The exception of the invocation. """ serialized_exception = self._get_exception(invocation_id) return self.deserialize_exception(serialized_exception)
[docs] @abstractmethod def set_workflow_data( self, workflow_identity: "WorkflowIdentity", key: str, value: Any ) -> None: """ Set a value in workflow data. :param "WorkflowIdentity" workflow_identity: Workflow identity :param str key: Data key to set :param Any value: Value to store """
[docs] @abstractmethod def get_workflow_data( self, workflow_identity: "WorkflowIdentity", key: str, default: Any = None ) -> Any: """ Get a value from workflow data. :param "WorkflowIdentity" workflow_identity: Workflow identity :param str key: Data key to retrieve :param Any default: Default value if key doesn't exist :return: Stored value or default """
[docs] @abstractmethod def store_app_info(self, app_info: "AppInfo") -> None: """ Register this app's information in the state backend for discovery. :param "AppInfo" app_info: The app information to store :return: None """
[docs] @abstractmethod def get_app_info(self) -> "AppInfo": """ Retrieve information of the current app. :return: The app information """
[docs] @staticmethod @abstractmethod def discover_app_infos() -> dict[str, "AppInfo"]: """ This static method tries to discover all registered apps in the system. We will try to connect to the backend with the default configuration. :return: Dictionary mapping app_id to app information """
[docs] @abstractmethod def store_workflow_run(self, workflow_identity: "WorkflowIdentity") -> None: """ Store a workflow run for tracking and monitoring. Maintains workflow type registry and specific workflow run instances. This enables monitoring of workflow types and their execution history. :param workflow_identity: The workflow identity to store """
[docs] @abstractmethod def get_all_workflow_types(self) -> Iterator["TaskId"]: """ Retrieve all workflow types (workflow_task_ids) stored in this state backend. :return: Iterator of workflow task IDs representing different workflow types """
[docs] @abstractmethod def get_all_workflow_runs(self) -> Iterator["WorkflowIdentity"]: """ Retrieve workflow run identities from this state backend. :return: Iterator of workflow identities for runs """
[docs] @abstractmethod def get_workflow_runs( self, workflow_type: "TaskId" ) -> Iterator["WorkflowIdentity"]: """ Retrieve workflow run identities from this state backend. :param workflow_type: Filter for specific workflow type :return: Iterator of workflow identities for runs """
[docs] @abstractmethod def store_workflow_sub_invocation( self, parent_workflow_id: "InvocationId", sub_invocation_id: "InvocationId" ) -> None: """ Store a sub-invocation ID that runs inside a parent workflow. This tracks which invocations (tasks or sub-workflows) are executed within the context of a parent workflow for monitoring and debugging. :param "InvocationId" parent_workflow_id: The workflow ID that contains the sub-invocation :param sub_invocation_id: The invocation ID of the task/sub-workflow running inside """
[docs] @abstractmethod def get_workflow_sub_invocations( self, workflow_id: "InvocationId" ) -> Iterator["InvocationId"]: """ Retrieve all sub-invocation IDs that run inside a specific workflow. :param workflow_id: The workflow ID to get sub-invocations for :return: Iterator of invocation IDs that run inside the workflow """
[docs] @abstractmethod def iter_invocations_in_timerange( self, start_time: datetime, end_time: datetime, batch_size: int = 100, ) -> Iterator[list["InvocationId"]]: """ Iterate over invocation IDs that have history within a time range. :param datetime start_time: Start of the time range :param datetime end_time: End of the time range :param int batch_size: Number of invocation IDs per batch :return: Iterator yielding batches of invocation IDs """
[docs] @abstractmethod def iter_history_in_timerange( self, start_time: datetime, end_time: datetime, batch_size: int = 100, ) -> Iterator[list["InvocationHistory"]]: """ Iterate over history entries within a time range. :param datetime start_time: Start of the time range :param datetime end_time: End of the time range :param int batch_size: Number of history entries per batch :return: Iterator yielding batches of InvocationHistory """
[docs] @abstractmethod def _store_runner_context(self, runner_context: "RunnerContext") -> None: """ Backend-specific implementation to store a runner context. :param RunnerContext runner_context: The context to store """
[docs] @abstractmethod def get_matching_runner_contexts( self, partial_id: str ) -> Iterator["RunnerContext"]: """ Search for runner contexts whose runner_id contains the partial string. Used by the log explorer to find runners from truncated log output. :param str partial_id: Partial runner ID to match (prefix or substring) :return: Iterator of matching RunnerContext instances """
[docs] @abstractmethod def get_invocation_ids_by_workflow( self, workflow_id: str | None = None, workflow_type_key: str | None = None, ) -> Iterator["InvocationId"]: """ Retrieve invocation IDs filtered by workflow criteria. :param str | None workflow_id: Exact workflow ID to match :param str | None workflow_type_key: Workflow type key to match :return: Iterator of matching invocation IDs """
[docs] def store_runner_context(self, runner_context: "RunnerContext") -> None: """ Store a runner context. Updates the local cache and delegates to backend implementation. :param RunnerContext runner_context: The context to store """ if runner_context.runner_id not in self._runner_context_cache: self._store_runner_context(runner_context) self._runner_context_cache[runner_context.runner_id] = runner_context if runner_context.parent_ctx: self.store_runner_context(runner_context.parent_ctx)
[docs] @abstractmethod def _get_runner_context(self, runner_id: str) -> "RunnerContext | None": """ Backend-specific implementation to retrieve a single runner context. :param str runner_id: The runner's unique identifier :return: The stored RunnerContext or None if not found """
[docs] @abstractmethod def _get_runner_contexts(self, runner_ids: list[str]) -> list["RunnerContext"]: """ Backend-specific implementation to retrieve multiple runner contexts. Only called for IDs not found in local cache. :param list[str] runner_ids: List of runner unique identifiers :return: list["RunnerContext"] of the stored RunnerContexts """
[docs] def get_runner_context(self, runner_id: str) -> "RunnerContext | None": """ Retrieve a runner context by runner_id. Checks local cache first, then backend if not found. :param str runner_id: The runner's unique identifier :return: The stored RunnerContext or None if not found """ if runner_id in self._runner_context_cache: return self._runner_context_cache[runner_id] # Try to fetch from backend contexts = self._get_runner_contexts([runner_id]) if contexts: ctx = contexts[0] self._runner_context_cache[runner_id] = ctx return ctx return None
[docs] def get_runner_contexts(self, runner_ids: list[str]) -> list["RunnerContext"]: """ Retrieve multiple runner contexts by their IDs. Uses local cache for known contexts, fetches missing ones from backend, and updates cache with newly fetched contexts. :param list[str] runner_ids: List of runner unique identifiers :return: list["RunnerContext"] of the stored RunnerContexts """ if not runner_ids: return [] # Check cache first cached_contexts = [] missing_ids = [] for runner_id in runner_ids: if runner_id in self._runner_context_cache: cached_contexts.append(self._runner_context_cache[runner_id]) else: missing_ids.append(runner_id) # If all found in cache, return immediately if not missing_ids: return cached_contexts # Fetch missing ones from backend fetched_contexts = self._get_runner_contexts(missing_ids) # Update cache with fetched contexts for ctx in fetched_contexts: self._runner_context_cache[ctx.runner_id] = ctx # Return all found contexts return cached_contexts + fetched_contexts