Source code for pynenc.broker.mem_broker

from collections import deque
from typing import TYPE_CHECKING

from pynenc.broker.base_broker import BaseBroker

if TYPE_CHECKING:
    from ..app import Pynenc
    from pynenc.identifiers.invocation_id import InvocationId


[docs] class MemBroker(BaseBroker): """ An in-memory implementation of the BaseBroker. This subclass of BaseBroker implements the abstract methods for routing, retrieving, and purging invocations using an in-memory deque. It's primarily intended for testing and demonstration purposes. ```{warning} The `MemBroker` class generates the queue in the process's memory and is not suitable for production systems. Its use should be limited to testing or demonstration purposes only. ``` :param Pynenc app: A reference to the Pynenc application. """ def __init__(self, app: "Pynenc") -> None: self._queue: deque = deque() super().__init__(app)
[docs] def route_invocation(self, invocation_id: "InvocationId") -> None: """ Route an invocation id by adding it to the in-memory queue. This method appends the invocation ID to the deque, effectively queuing it for processing. :param InvocationId invocation_id: The ID of the invocation to be queued. """ self._queue.append(invocation_id)
[docs] def route_invocations(self, invocation_ids: list["InvocationId"]) -> None: """ Routes multiple invocation IDs at once. :param list[InvocationId] invocation_ids: The invocation IDs to be routed. """ for invocation_id in invocation_ids: self.route_invocation(invocation_id) if invocation_ids: self.app.logger.debug(f"Batch routed {len(invocation_ids)} invocations")
[docs] def retrieve_invocation(self) -> "InvocationId | None": """ Retrieve the next invocation id from the queue. This method pops the next item from the deque and returns the invocation ID. If the queue is empty, it returns None. :return: The next invocation id from the queue, or None if the queue is empty. :rtype: InvocationId | None """ if self._queue: return self._queue.popleft() return None
[docs] def count_invocations(self) -> int: """ Get the number of invocations in the in-memory queue. """ return len(self._queue)
[docs] def purge(self) -> None: """ Clear all invocations from the in-memory queue. This method empties the deque, removing all pending invocations. """ return self._queue.clear()