pynenc.broker.redis_broker

Module Contents

Classes

RedisQueue

RedisQueue: A helper class for managing message queues in Redis.

RedisBroker

A Redis-backed implementation of the BaseBroker.

API

class pynenc.broker.redis_broker.RedisQueue(app: pynenc.app.Pynenc, client: redis.Redis, name: str, namespace: str = 'queue')[source]

RedisQueue: A helper class for managing message queues in Redis.

This class provides methods for sending, receiving, and purging messages in a Redis queue. It is designed to work specifically with the RedisBroker class to handle message queuing.

Parameters:
  • app (Pynenc) – A reference to the Pynenc application.

  • client (redis.Redis) – A Redis client instance.

  • name (str) – The name of the queue.

  • namespace (str) – The namespace for the queue, defaults to ‘queue’.

Initialization

send_message(message: str) None[source]

Send a message to the Redis queue.

This method appends a message to the right end of the Redis list, effectively queuing it for processing.

Parameters:

message (str) – The message to be queued.

send_messages_batch(messages: list[str]) None[source]

Send multiple messages to the Redis queue in a single operation.

This method uses a Redis pipeline to send multiple messages to the queue in a single operation, which can improve performance when routing multiple invocations.

Parameters:

messages (list[str]) – The list of messages to be queued.

receive_message() Optional[str][source]

Receive a message from the Redis queue.

This method blocks for a configured timeout waiting for a message. Uses BLPOP for efficient waiting instead of polling.

Returns:

The message from the queue, or None if timeout is reached.

count_invocations() int[source]

Get the number of messages in the Redis queue.

This method queries the Redis queue for the number of messages currently in the queue.

Returns:

The number of messages in the queue.

purge() None[source]

Purge all messages from the Redis queue.

This method clears all messages in the queue, effectively resetting it.

class pynenc.broker.redis_broker.RedisBroker(app: pynenc.app.Pynenc)[source]

Bases: pynenc.broker.base_broker.BaseBroker

A Redis-backed implementation of the BaseBroker.

This subclass of BaseBroker implements the abstract methods for routing, retrieving, and purging invocations using Redis as the message broker. It is suitable for production environments where robustness and scalability are required.

Parameters:

app (Pynenc) – A reference to the Pynenc application.

Initialization

property client: redis.Redis

Lazy initialization of Redis client

property queue: pynenc.broker.redis_broker.RedisQueue
conf() pynenc.conf.config_broker.ConfigBrokerRedis
route_invocation(invocation: pynenc.invocation.dist_invocation.DistributedInvocation) None[source]

Route an invocation by sending it to the Redis queue.

This method serializes the DistributedInvocation object to JSON and sends it to the Redis queue for processing using the send_message method.

Parameters:

invocation (DistributedInvocation) – The invocation to be queued.

route_invocations(invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation]) None[source]

Routes multiple invocations at once using Redis pipeline for better performance.

Parameters:

invocations (list[DistributedInvocation]) – The invocations to be routed.

retrieve_invocation() Optional[pynenc.invocation.dist_invocation.DistributedInvocation][source]

Retrieve the next invocation from the Redis queue.

This method receives a message from the Redis queue using the receive_message method. If a message is received, it deserializes the JSON string back into a DistributedInvocation object.

Returns:

The next invocation from the queue, or None if the queue is empty.

count_invocations() int[source]

Get the number of invocations in the Redis queue.

This method queries the Redis queue for the number of messages currently in the queue.

Returns:

The number of invocations in the queue.

purge() None[source]

Purge all invocations from the Redis queue.

This method delegates to the purge method of the RedisQueue to clear all messages.