Source code for pynenc.broker.base_broker

from abc import ABC, abstractmethod
from functools import cached_property
from typing import TYPE_CHECKING, Optional

from pynenc import context
from pynenc.call import Call
from pynenc.conf.config_broker import ConfigBroker
from pynenc.invocation.dist_invocation import DistributedInvocation
from pynenc.types import Params, Result

if TYPE_CHECKING:
    from ..app import Pynenc


[docs] class BaseBroker(ABC): """ An Abstract Base Class for Message Brokers in Pynenc** This class serves as the foundational structure for implementing various message brokers. It currently supports a simple FIFO queue and is extendable for more complex functionalities like priority queues and integration with different databases and message queues. :param Pynenc app: A reference to the Pynenc application. ```{note} The `BaseBroker` is currently implemented with an in-memory queue (`MemBroker`) for testing and demonstration, and a Redis-backed queue (`RedisBroker`) for production use. ``` ```{attention} The implementation is currently limited to a FIFO queue. Future enhancements will include support for priority queues and compatibility with other databases and message queues like RabbitMQ. ``` ```{hint} The class is designed to be flexible and expandable, allowing for easy integration of additional features and message brokers in the future. ``` ```{seealso} For more advanced or production-ready features, refer to the specific implementations like `RedisBroker`. ``` The `route_call` method creates a new invocation and routes it, demonstrating a basic usage of the broker. ### Examples ```{code-block} python # Assuming `app` is an instance of Pynenc and `call` is a valid Call object broker = BaseBroker(app) invocation = broker.route_call(call) ``` """ def __init__(self, app: "Pynenc") -> None: self.app = app @cached_property def conf(self) -> ConfigBroker: return ConfigBroker( config_values=self.app.config_values, config_filepath=self.app.config_filepath, )
[docs] @abstractmethod def route_invocation(self, invocation: DistributedInvocation) -> None: """ Abstract method for routing a given invocation. This method should define the process of handling and dispatching a given DistributedInvocation within the broker system. Implementations might involve sending the invocation to a queue or handling it internally. :param DistributedInvocation invocation: The invocation to be routed. """
[docs] @abstractmethod def retrieve_invocation(self) -> Optional[DistributedInvocation]: """ Method to retrieve a distributed invocation. Implementations of this method should detail how to retrieve the next available invocation from the broker's queue or storage system. It is expected to return a DistributedInvocation if one is available, or None if the queue is empty. :return: The next invocation to be processed, or None. """
[docs] @abstractmethod def purge(self) -> None: """ Method to purge the message queue. This method is intended to clear or reset the state of the broker's queue, removing all pending invocations. It's crucial for error handling and managing the queue in specific situations like maintenance or reset. """
# @abstractmethod # def _acknowledge_invocation(self, invocation: DistributedInvocation) -> None: # ... # @abstractmethod # def _requeue_invocation(self, invocation: DistributedInvocation) -> None: # ...
[docs] def route_call( self, call: "Call[Params, Result]" ) -> DistributedInvocation[Params, Result]: """ Creates and routes a new DistributedInvocation based on the given call. This method instantiates a DistributedInvocation with the provided call and the current invocation context. It then routes this invocation using the `route_invocation` method. This demonstrates the basic use of the broker's functionality. :param Call[Params, Result] call: The call object to be transformed into an invocation. :return: The routed invocation. ```{note} The method also logs the routing process for debugging purposes. ``` """ parent_invocation = context.get_dist_invocation_context(self.app.app_id) self.route_invocation( invocation := DistributedInvocation( call, parent_invocation=parent_invocation ) ) self.app.logger.debug( f"Routed {call=} on invocation {invocation.invocation_id}" ) return invocation