Source code for pynenc.state_backend.mem_state_backend

from collections import defaultdict
from typing import TYPE_CHECKING, Any

from pynenc.state_backend.base_state_backend import BaseStateBackend

if TYPE_CHECKING:
    from pynenc.app import Pynenc
    from pynenc.invocation.dist_invocation import DistributedInvocation
    from pynenc.state_backend.base_state_backend import InvocationHistory
    from pynenc.types import Params, Result


[docs] class MemStateBackend(BaseStateBackend): """ A memory-based implementation of the state backend. Stores invocation data, history, results, and exceptions in in-memory dictionaries. Useful for environments where persistence is not required or for testing purposes. ```{warning} The `MemStateBackend` class stores all data in the process's memory and is not suitable for production systems. Its use should be limited to testing or demonstration purposes only. ``` """ def __init__(self, app: "Pynenc") -> None: self._cache: dict[str, "DistributedInvocation"] = {} self._history: dict[str, list] = defaultdict(list) self._results: dict[str, Any] = {} self._exceptions: dict[str, Exception] = {} super().__init__(app)
[docs] def purge(self) -> None: """Clears all stored data""" self._cache.clear() self._history.clear() self._results.clear() self._exceptions.clear()
[docs] def _upsert_invocation(self, invocation: "DistributedInvocation") -> None: """ Inserts or updates an invocation in the memory cache. :param DistributedInvocation invocation: The invocation object to upsert. """ self._cache[invocation.invocation_id] = invocation
[docs] def _get_invocation( self, invocation_id: str ) -> "DistributedInvocation[Params, Result]": """ Retrieves an invocation from the memory cache by its ID. :param str invocation_id: The ID of the invocation to retrieve. :return: The retrieved invocation object. """ return self._cache[invocation_id]
[docs] def _add_history( self, invocation: "DistributedInvocation", invocation_history: "InvocationHistory", ) -> None: """ Adds a history record to an invocation. :param DistributedInvocation invocation: The invocation to add history for. :param InvocationHistory invocation_history: The history record to add. """ self._history[invocation.invocation_id].append(invocation_history)
[docs] def _get_history( self, invocation: "DistributedInvocation[Params, Result]" ) -> list["InvocationHistory"]: """ Retrieves the history of an invocation. :param DistributedInvocation invocation: The invocation to get the history for. :return: A list of invocation history records. """ return self._history[invocation.invocation_id]
[docs] def _set_result( self, invocation: "DistributedInvocation[Params, Result]", result: "Result" ) -> None: """ Sets the result for an invocation. :param DistributedInvocation invocation: The invocation to set the result for. :param Result result: The result of the invocation. """ self._results[invocation.invocation_id] = result
[docs] def _set_exception( self, invocation: "DistributedInvocation[Params, Result]", exception: "Exception", ) -> None: """ Sets the exception for an invocation. :param DistributedInvocation invocation: The invocation to set the exception for. :param Exception exception: The exception to set. """ self._exceptions[invocation.invocation_id] = exception
[docs] def _get_result( self, invocation: "DistributedInvocation[Params, Result]" ) -> "Result": """ Retrieves the result of an invocation. :param DistributedInvocation invocation: The invocation to get the result for. :return: The result of the invocation. """ return self._results[invocation.invocation_id]
[docs] def _get_exception( self, invocation: "DistributedInvocation[Params, Result]" ) -> Exception: """ Retrieves the exception of an invocation. :param DistributedInvocation invocation: The invocation to get the exception for. :return: The exception of the invocation. """ return self._exceptions[invocation.invocation_id]