Source code for pynenc.state_backend.base_state_backend

import json
import threading
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from functools import cached_property
from typing import TYPE_CHECKING, Any, Iterator, Optional

from pynenc.conf.config_state_backend import ConfigStateBackend
from pynenc.exceptions import InvocationNotFoundError
from pynenc.invocation.status import InvocationStatus

if TYPE_CHECKING:
    from pynenc.app import AppInfo, Pynenc
    from pynenc.invocation.dist_invocation import DistributedInvocation
    from pynenc.types import Params, Result
    from pynenc.workflow import WorkflowIdentity


[docs] @dataclass class InvocationHistory: """ Data structure representing the history of a task invocation. Stores the invocation ID, timestamp, status, and execution context. Provides methods for serialization and deserialization to and from JSON. :ivar invocation_id: Unique identifier of the invocation. :ivar _timestamp: Timestamp of the invocation history creation. :ivar status: Current status of the invocation. :ivar execution_context: Context of the execution, reserved for future use. """ invocation_id: str _timestamp: datetime = field( init=False, default_factory=lambda: datetime.now(timezone.utc) ) status: Optional["InvocationStatus"] = None execution_context: Optional[Any] = None # Todo on Runners @property def timestamp(self) -> datetime: """Returns the timestamp of the invocation history.""" return self._timestamp
[docs] def to_json(self) -> str: """ Serializes the invocation history to a JSON string. :return: A JSON representation of the invocation history. """ return json.dumps( { "invocation_id": self.invocation_id, "_timestamp": self._timestamp.isoformat(), "status": self.status.value if self.status else None, "execution_context": self.execution_context, # TODO } )
[docs] @classmethod def from_json(cls, json_str: str) -> "InvocationHistory": """ Deserializes a JSON string into an InvocationHistory object. :param str json_str: JSON string to deserialize. :return: An instance of InvocationHistory. """ hist_dict = json.loads(json_str) history = cls(hist_dict["invocation_id"]) timestamp = datetime.fromisoformat(hist_dict["_timestamp"]) if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=timezone.utc) history._timestamp = timestamp if hist_dict["status"] is not None: history.status = InvocationStatus(hist_dict["status"]) history.execution_context = hist_dict["execution_context"] return history
[docs] class BaseStateBackend(ABC): """ Abstract base class for state backends in a distributed task system. Manages storage and retrieval of invocation-related data, including execution status, history, results, and exceptions. """ def __init__(self, app: "Pynenc") -> None: self.app = app self.invocation_threads: dict[str, list[threading.Thread]] = defaultdict(list) @cached_property def conf(self) -> ConfigStateBackend: return ConfigStateBackend( config_values=self.app.config_values, config_filepath=self.app.config_filepath, )
[docs] def wait_for_all_async_operations(self) -> None: """ Waits for all asynchronous operations related to invocation status to complete. """ for invocation_id in self.invocation_threads: self.wait_for_invocation_async_operations(invocation_id)
[docs] def wait_for_invocation_async_operations(self, invocation_id: str) -> None: """ Waits for all asynchronous operations for a specific invocation to complete. :param str invocation_id: ID of the invocation. """ for thread in self.invocation_threads[invocation_id]: thread.join()
[docs] @abstractmethod def purge(self) -> None: """Purges all store state backend data for the current application"""
[docs] @abstractmethod def _upsert_invocation( self, invocation: "DistributedInvocation[Params, Result]" ) -> None: """ Updates or inserts an invocation. :param DistributedInvocation invocation: The invocation to upsert. """
[docs] @abstractmethod def _get_invocation(self, invocation_id: str) -> Optional["DistributedInvocation"]: """Retrieves an invocation by its ID."""
[docs] @abstractmethod def _add_history( self, invocation: "DistributedInvocation[Params, Result]", invocation_history: InvocationHistory, ) -> None: """Adds a history record for an invocation."""
[docs] @abstractmethod def _get_history( self, invocation: "DistributedInvocation[Params, Result]" ) -> list[InvocationHistory]: """ Retrieves the history of an invocation. :param DistributedInvocation invocation: The invocation to get the history from """
[docs] @abstractmethod def _get_result( self, invocation: "DistributedInvocation[Params, Result]" ) -> "Result": """ Retrieves the result of an invocation. :param DistributedInvocation invocation: The invocation to get the result from """
[docs] @abstractmethod def _set_result( self, invocation: "DistributedInvocation[Params, Result]", result: "Result" ) -> None: """ Sets the result of an invocation. :param DistributedInvocation invocation: The invocation to set :param Result result: The result to set """
[docs] @abstractmethod def _get_exception( self, invocation: "DistributedInvocation[Params, Result]" ) -> "Exception": """ Retrieves the exception of an invocation. :param DistributedInvocation invocation: The invocation to get the exception from """
[docs] @abstractmethod def _set_exception( self, invocation: "DistributedInvocation[Params, Result]", exception: "Exception", ) -> None: """ Sets the raised exception by an invocation ran. :param DistributedInvocation invocation: The invocation to set :param Exception exception: The exception raised """
[docs] def upsert_invocation(self, invocation: "DistributedInvocation") -> None: """ Starts an asynchronous operation to update or insert an invocation. :param DistributedInvocation invocation: The invocation to upsert. """ self._upsert_invocation(invocation)
[docs] def get_invocation(self, invocation_id: str) -> "DistributedInvocation": """ Retrieves an invocation by its ID, raising an error if not found. :param DistributedInvocation invocation_id: ID of the invocation. :return: The retrieved invocation. :raises InvocationNotFoundError: If the invocation wasn't found. """ if invocation := self._get_invocation(invocation_id): return invocation raise InvocationNotFoundError(invocation_id, "The invocation wasn't stored")
[docs] def add_history( self, invocation: "DistributedInvocation[Params, Result]", status: Optional["InvocationStatus"] = None, execution_context: Optional["Any"] = None, ) -> None: """ Adds a history record for an invocation. :param DistributedInvocation invocation: The invocation to add history for. :param Optional[InvocationStatus] status: The status of the invocation. :param Optional[Any] execution_context: The execution context of the invocation. """ invocation_history = InvocationHistory( invocation.invocation_id, status, execution_context ) thread = threading.Thread( target=self._add_history, args=(invocation, invocation_history) ) self.invocation_threads[invocation.invocation_id].append(thread) thread.start()
[docs] def get_history( self, invocation: "DistributedInvocation[Params, Result]" ) -> list[InvocationHistory]: """ Retrieves the history of an invocation. :param DistributedInvocation invocation: The invocation to retrieve history for. :return: A list of invocation history records. """ # todo fork open threads return self._get_history(invocation)
[docs] def set_result(self, invocation: "DistributedInvocation", result: "Result") -> None: """ Sets the result of an invocation. :param DistributedInvocation invocation: The invocation to set the result for. :param Result result: The result of the invocation. """ self._set_result(invocation, result)
[docs] def get_result( self, invocation: "DistributedInvocation[Params, Result]" ) -> "Result": """ Retrieves the result of an invocation. :param DistributedInvocation invocation: The invocation to retrieve the result for. :return: The result of the invocation. """ # insert result is block, no need for thread control return self._get_result(invocation)
[docs] def set_exception( self, invocation: "DistributedInvocation", exception: "Exception" ) -> None: """ Sets the exception of an invocation. :param DistributedInvocation invocation: The invocation to set the exception for. :param Exception exception: The exception of the invocation. """ self._set_exception(invocation, exception)
[docs] def get_exception( self, invocation: "DistributedInvocation[Params, Result]" ) -> Exception: """ Retrieves the exception of an invocation. :param DistributedInvocation invocation: The invocation to retrieve the exception for. :return: The exception of the invocation. """ return self._get_exception(invocation)
[docs] @abstractmethod def set_workflow_data( self, workflow_identity: "WorkflowIdentity", key: str, value: Any ) -> None: """ Set a value in workflow data. :param workflow_identity: Workflow identity :param key: Data key to set :param value: Value to store """
[docs] @abstractmethod def get_workflow_data( self, workflow_identity: "WorkflowIdentity", key: str, default: Any = None ) -> Any: """ Get a value from workflow data. :param workflow_identity: Workflow identity :param key: Data key to retrieve :param default: Default value if key doesn't exist :return: Stored value or default """
[docs] @abstractmethod def get_workflow_deterministic_value( self, workflow: "WorkflowIdentity", key: str ) -> Any: """ Get a stored deterministic value for a workflow. :param workflow_identity: Workflow identity :param key: Value key :return: Stored value or None if not found """
[docs] @abstractmethod def set_workflow_deterministic_value( self, workflow: "WorkflowIdentity", key: str, value: Any ) -> None: """ Store a deterministic value for a workflow. :param workflow_identity: Workflow identity :param key: Value key :param value: Value to store """
[docs] @abstractmethod def store_app_info(self, app_info: "AppInfo") -> None: """ Register this app's information in the state backend for discovery. :param app_info: The app information to store :return: None """
[docs] @abstractmethod def get_app_info(self) -> "AppInfo": """ Retrieve information of the current app. :return: The app information """
[docs] @staticmethod @abstractmethod def get_all_app_infos() -> dict[str, "AppInfo"]: """ Retrieve all app information registered in this state backend. :return: Dictionary mapping app_id to app information """
[docs] @abstractmethod def store_workflow_run(self, workflow_identity: "WorkflowIdentity") -> None: """ Store a workflow run for tracking and monitoring. Maintains workflow type registry and specific workflow run instances. This enables monitoring of workflow types and their execution history. :param workflow_identity: The workflow identity to store """
[docs] @abstractmethod def get_all_workflows(self) -> Iterator[str]: """ Retrieve all workflow types (workflow_task_ids) stored in this state backend. :return: Iterator of workflow task IDs representing different workflow types """
[docs] @abstractmethod def get_all_workflows_runs(self) -> Iterator["WorkflowIdentity"]: """ Retrieve workflow run identities from this state backend. :return: Iterator of workflow identities for runs """
[docs] @abstractmethod def get_workflow_runs(self, workflow_task_id: str) -> Iterator["WorkflowIdentity"]: """ Retrieve workflow run identities from this state backend. :param workflow_task_id: Filter for specific workflow type :return: Iterator of workflow identities for runs """
[docs] @abstractmethod def store_workflow_sub_invocation( self, parent_workflow_id: str, sub_invocation_id: str ) -> None: """ Store a sub-invocation ID that runs inside a parent workflow. This tracks which invocations (tasks or sub-workflows) are executed within the context of a parent workflow for monitoring and debugging. :param parent_workflow_id: The workflow ID that contains the sub-invocation :param sub_invocation_id: The invocation ID of the task/sub-workflow running inside """
[docs] @abstractmethod def get_workflow_sub_invocations(self, workflow_id: str) -> Iterator[str]: """ Retrieve all sub-invocation IDs that run inside a specific workflow. :param workflow_id: The workflow ID to get sub-invocations for :return: Iterator of invocation IDs that run inside the workflow """