Source code for pynenc.broker.sqlite_broker

"""
SQLite-based broker for cross-process testing.

This module provides a SQLite-based broker implementation that enables
true cross-process coordination for testing process runners.
"""

from functools import cached_property
from typing import TYPE_CHECKING

from pynenc.broker.base_broker import BaseBroker
from pynenc.conf.config_broker import ConfigBrokerSQLite
from pynenc.identifiers.invocation_id import InvocationId
from pynenc.util.sqlite_utils import TableNames
from pynenc.util.sqlite_utils import create_sqlite_connection as sqlite_conn
from pynenc.util.sqlite_utils import (
    delete_tables_with_prefix,
    get_sqlite_sqlite_db_path,
)

if TYPE_CHECKING:
    from pynenc.app import Pynenc


[docs] class Tables(TableNames): """Table names for broker, scoped by app_id.""" def __init__(self, app_id: str) -> None: super().__init__(app_id, "broker") self.QUEUE = f"{self.table_prefix}_message_queue"
[docs] class SQLiteBroker(BaseBroker): """ A SQLite-based implementation of the broker for cross-process testing. Uses SQLite for cross-process message queue coordination and implements all required abstract methods from BaseBroker. It's designed specifically for testing process runners. ```{warning} The `SQLiteBroker` class is designed for testing purposes only and should not be used in production systems. It uses temporary SQLite files for state. ``` """ def __init__(self, app: "Pynenc") -> None: super().__init__(app) self.tables = Tables(app.app_id) # Use database path from configuration with validation self.sqlite_db_path = get_sqlite_sqlite_db_path(self.conf.sqlite_db_path) # Initialize database tables self._init_tables()
[docs] def _init_tables(self) -> None: """Initialize SQLite tables for broker.""" with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.QUEUE} ( id INTEGER PRIMARY KEY AUTOINCREMENT, invocation_id TEXT NOT NULL, created_at REAL NOT NULL DEFAULT (julianday('now')) ) """ ) conn.execute( f""" CREATE INDEX IF NOT EXISTS idx_{self.tables.QUEUE}_created_at ON {self.tables.QUEUE}(created_at) """ ) conn.commit()
@cached_property def conf(self) -> ConfigBrokerSQLite: return ConfigBrokerSQLite( config_values=self.app.config_values, config_filepath=self.app.config_filepath, )
[docs] def send_message(self, invocation_id: "InvocationId") -> None: """Send a message (invocation ID) to the queue.""" with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f"INSERT INTO {self.tables.QUEUE} (invocation_id, created_at) VALUES (?, julianday('now'))", (invocation_id,), ) conn.commit()
[docs] def route_invocation(self, invocation_id: "InvocationId") -> None: """Route a single invocation ID by sending it to the message queue.""" self.send_message(invocation_id)
[docs] def route_invocations(self, invocation_ids: list["InvocationId"]) -> None: """Route multiple invocation IDs by sending them to the message queue.""" for invocation_id in invocation_ids: self.route_invocation(invocation_id)
[docs] def retrieve_invocation(self) -> "InvocationId | None": """ Atomically retrieve and remove a single invocation from the queue. Ensures that no two processes can retrieve the same invocation. :return: The next invocation ID in the queue, or None if empty. """ with sqlite_conn(self.sqlite_db_path) as conn: conn.execute("BEGIN IMMEDIATE") # Lock for atomicity cursor = conn.execute( f"SELECT id, invocation_id FROM {self.tables.QUEUE} ORDER BY created_at ASC LIMIT 1" ) row = cursor.fetchone() cursor.close() if not row: return None message_id, invocation_id = row conn.execute(f"DELETE FROM {self.tables.QUEUE} WHERE id = ?", (message_id,)) conn.commit() return InvocationId(invocation_id)
[docs] def count_invocations(self) -> int: """Count the number of invocations in the queue.""" with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute(f"SELECT COUNT(*) FROM {self.tables.QUEUE}") return cursor.fetchone()[0]
[docs] def purge(self) -> None: """Clear all broker messages.""" delete_tables_with_prefix(self.sqlite_db_path, self.tables.table_prefix) self._init_tables()