pynenc.broker.sqlite_broker

SQLite-based broker for cross-process testing.

This module provides a SQLite-based broker implementation that enables true cross-process coordination for testing process runners.

Module Contents

Classes

Tables

SQLiteBroker

A SQLite-based implementation of the broker for cross-process testing.

API

class pynenc.broker.sqlite_broker.Tables[source]
QUEUE

‘broker_message_queue’

class pynenc.broker.sqlite_broker.SQLiteBroker(app: pynenc.app.Pynenc)[source]

Bases: pynenc.broker.base_broker.BaseBroker

A SQLite-based implementation of the broker for cross-process testing.

Uses SQLite for cross-process message queue coordination and implements all required abstract methods from BaseBroker. It’s designed specifically for testing process runners.

Warning

The SQLiteBroker class is designed for testing purposes only and should not be used in production systems. It uses temporary SQLite files for state.

Initialization

_init_tables() None[source]

Initialize SQLite tables for broker.

conf() pynenc.conf.config_broker.ConfigBrokerSQLite
send_message(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Send a message (invocation ID) to the queue.

route_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Route a single invocation ID by sending it to the message queue.

route_invocations(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]

Route multiple invocation IDs by sending them to the message queue.

retrieve_invocation() InvocationId | None[source]

Atomically retrieve and remove a single invocation from the queue. Ensures that no two processes can retrieve the same invocation.

Returns:

The next invocation ID in the queue, or None if empty.

count_invocations() int[source]

Count the number of invocations in the queue.

purge() None[source]

Clear all broker messages.