Source code for pynenc.trigger.sqlite_trigger

"""
SQLite-based implementation of the Pynenc trigger subsystem.

This module provides a trigger system implementation that stores
all its state in a SQLite database. Suitable for cross-process coordination and testing.
"""

from collections.abc import Iterable
from collections import defaultdict
from datetime import UTC, datetime, timedelta
from functools import cached_property
from typing import TYPE_CHECKING

from pynenc.conf.config_trigger import ConfigTriggerSQLite
from pynenc.identifiers.task_id import TaskId
from pynenc.models.trigger_definition_dto import TriggerDefinitionDTO
from pynenc.trigger.base_trigger import BaseTrigger
from pynenc.trigger.conditions import (
    ConditionContext,
    TriggerCondition,
    ValidCondition,
    CompositeLogic,
)
from pynenc.trigger.monitoring import (
    EventMarker,
    EventMarkerPage,
    EventRecord,
    TriggerRunRecord,
)
from pynenc.util.sqlite_utils import create_sqlite_connection as sqlite_conn
from pynenc.util.sqlite_utils import TableNames, delete_tables_with_prefix

if TYPE_CHECKING:
    from pynenc.app import Pynenc
    from pynenc.util.sqlite_utils import SQLiteConnection


# ---------------------------------------------------------------------- #
# Table-name registry
# ---------------------------------------------------------------------- #
# SQLiteTrigger uses per-app table prefixes; keeping the names in one object
# makes SQL construction local while preserving app-level isolation.


[docs] class Tables(TableNames): """Table names for trigger, scoped by app_id.""" def __init__(self, app_id: str) -> None: super().__init__(app_id, "trg") p = self.table_prefix self.CONDITIONS = f"{p}_conditions" self.TRIGGERS = f"{p}_triggers" self.CONDITION_TRIGGERS = f"{p}_condition_triggers" self.VALID_CONDITIONS = f"{p}_valid_conditions" self.SOURCE_TASK_CONDITIONS = f"{p}_source_task_conditions" self.EXECUTION_CLAIMS = f"{p}_execution_claims" self.TRIGGER_RUN_CLAIMS = f"{p}_trigger_run_claims" self.EVENTS = f"{p}_events" self.EVENT_CONDITIONS = f"{p}_event_conditions" self.EVENT_TRIGGERED_INVOCATIONS = f"{p}_event_triggered_invocations" self.TRIGGER_RUNS = f"{p}_trigger_runs" self.TRIGGER_RUN_CONDITIONS = f"{p}_trigger_run_conditions" self.SCHEMA_VERSION = f"{p}_schema_version"
[docs] class SQLiteTrigger(BaseTrigger): """ SQLite-based implementation of the Pynenc trigger system. Stores all trigger, condition, and claim data in a SQLite database for cross-process safety. """ def __init__(self, app: "Pynenc") -> None: super().__init__(app) self.tables = Tables(app.app_id) self.sqlite_db_path = self.conf.sqlite_db_path self._init_tables() @cached_property def conf(self) -> ConfigTriggerSQLite: return ConfigTriggerSQLite( config_values=self.app.config_values, config_filepath=self.app.config_filepath, ) # ------------------------------------------------------------------ # # Schema initialization # ------------------------------------------------------------------ # # Table creation stays in this class for now so the large-file refactor # remains comment-only; the boundary is explicit for a later extraction.
[docs] def _init_tables(self) -> None: """Initialize SQLite tables for trigger state.""" with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.CONDITIONS} ( condition_id TEXT PRIMARY KEY, condition_json TEXT NOT NULL, last_cron_execution TIMESTAMP ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.TRIGGERS} ( trigger_id TEXT PRIMARY KEY, task_id_key TEXT NOT NULL, logic_value TEXT NOT NULL, argument_provider_json TEXT ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.CONDITION_TRIGGERS} ( condition_id TEXT NOT NULL, trigger_id TEXT NOT NULL, PRIMARY KEY (condition_id, trigger_id) ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.VALID_CONDITIONS} ( valid_condition_id TEXT PRIMARY KEY, valid_condition_json TEXT NOT NULL ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.SOURCE_TASK_CONDITIONS} ( task_id_key TEXT NOT NULL, condition_id TEXT NOT NULL, PRIMARY KEY (task_id_key, condition_id) ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.EXECUTION_CLAIMS} ( claim_key TEXT PRIMARY KEY, expiration TIMESTAMP ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.TRIGGER_RUN_CLAIMS} ( trigger_run_id TEXT PRIMARY KEY, expiration TIMESTAMP ) """ ) self._init_monitoring_tables(conn) self._ensure_schema_version(conn) conn.commit()
# ------------------------------------------------------------------ # # Schema versioning # ------------------------------------------------------------------ # SCHEMA_VERSION_CURRENT: int = 3
[docs] def _ensure_schema_version(self, conn: "SQLiteConnection") -> None: """Create the schema-version table and record the current version. Used to detect future column or index migrations. A missing version means the schema predates this introduction; ``store_event`` and related operations remain backward compatible at version 1. """ conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.SCHEMA_VERSION} ( key TEXT PRIMARY KEY, value INTEGER NOT NULL ) """ ) conn.execute( f""" INSERT INTO {self.tables.SCHEMA_VERSION} (key, value) VALUES ('version', ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value """, (self.SCHEMA_VERSION_CURRENT,), )
[docs] def _init_monitoring_tables(self, conn: "SQLiteConnection") -> None: """Create event/trigger-run monitoring tables and indexes.""" self._init_event_tables(conn) self._init_event_invocation_index(conn) self._init_trigger_run_tables(conn)
[docs] def _init_event_tables(self, conn: "SQLiteConnection") -> None: """Create the events table plus its core indexes.""" conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.EVENTS} ( event_id TEXT PRIMARY KEY, event_code TEXT NOT NULL, event_timestamp REAL NOT NULL, event_json TEXT NOT NULL, matched_condition_count INTEGER NOT NULL DEFAULT 0, triggered_invocation_count INTEGER NOT NULL DEFAULT 0, emitted_by_invocation_id TEXT, emitted_by_task_id TEXT, emitted_by_runner_context_id TEXT ) """ ) columns = { row[1] for row in conn.execute( f"PRAGMA table_info({self.tables.EVENTS})" ).fetchall() } if "emitted_by_runner_context_id" not in columns: conn.execute( f"ALTER TABLE {self.tables.EVENTS} " "ADD COLUMN emitted_by_runner_context_id TEXT" ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.EVENTS}_code_time " f"ON {self.tables.EVENTS}(event_code, event_timestamp)" ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.EVENTS}_time " f"ON {self.tables.EVENTS}(event_timestamp)" ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.EVENTS}_emit_inv " f"ON {self.tables.EVENTS}(emitted_by_invocation_id) " f"WHERE emitted_by_invocation_id IS NOT NULL" ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.EVENT_CONDITIONS} ( event_id TEXT NOT NULL, condition_id TEXT NOT NULL, valid_condition_id TEXT, matched INTEGER NOT NULL, PRIMARY KEY (event_id, condition_id) ) """ ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.EVENT_CONDITIONS}_cond " f"ON {self.tables.EVENT_CONDITIONS}(condition_id)" )
[docs] def _init_event_invocation_index(self, conn: "SQLiteConnection") -> None: """Create the relation table linking events to triggered invocations.""" conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.EVENT_TRIGGERED_INVOCATIONS} ( event_id TEXT NOT NULL, invocation_id TEXT NOT NULL, trigger_run_id TEXT, linked_at REAL NOT NULL, PRIMARY KEY (event_id, invocation_id) ) """ ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.EVENT_TRIGGERED_INVOCATIONS}_inv " f"ON {self.tables.EVENT_TRIGGERED_INVOCATIONS}(invocation_id)" )
[docs] def _init_trigger_run_tables(self, conn: "SQLiteConnection") -> None: """Create trigger-run tables, indexes, and participant rows.""" conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.TRIGGER_RUNS} ( trigger_run_id TEXT PRIMARY KEY, trigger_id TEXT NOT NULL, task_id_key TEXT NOT NULL, logic_value TEXT NOT NULL, claimed_at REAL, executed_at REAL, triggered_invocation_id TEXT, trigger_run_json TEXT NOT NULL ) """ ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.TRIGGER_RUNS}_inv " f"ON {self.tables.TRIGGER_RUNS}(triggered_invocation_id)" ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.TRIGGER_RUNS}_task_time " f"ON {self.tables.TRIGGER_RUNS}(task_id_key, executed_at)" ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {self.tables.TRIGGER_RUN_CONDITIONS} ( trigger_run_id TEXT NOT NULL, context_type TEXT, condition_id TEXT, valid_condition_id TEXT, event_id TEXT, source_invocation_id TEXT ) """ ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.TRIGGER_RUN_CONDITIONS}_run " f"ON {self.tables.TRIGGER_RUN_CONDITIONS}(trigger_run_id)" ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.TRIGGER_RUN_CONDITIONS}_evt " f"ON {self.tables.TRIGGER_RUN_CONDITIONS}(event_id)" ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.TRIGGER_RUN_CONDITIONS}_src_inv " f"ON {self.tables.TRIGGER_RUN_CONDITIONS}(source_invocation_id)" ) conn.execute( f"CREATE INDEX IF NOT EXISTS idx_{self.tables.TRIGGER_RUN_CONDITIONS}_valid " f"ON {self.tables.TRIGGER_RUN_CONDITIONS}(valid_condition_id)" )
# ------------------------------------------------------------------ # # Registration storage # ------------------------------------------------------------------ # # These methods persist trigger definitions, condition definitions, and # reverse indexes used by the BaseTrigger lifecycle-report hooks.
[docs] def _register_condition(self, condition: TriggerCondition) -> None: with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f"INSERT OR REPLACE INTO {self.tables.CONDITIONS} (condition_id, condition_json) VALUES (?, ?)", (condition.condition_id, condition.to_json(self.app)), ) conn.commit()
[docs] def get_condition(self, condition_id: str) -> TriggerCondition | None: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT condition_json FROM {self.tables.CONDITIONS} WHERE condition_id = ?", (condition_id,), ) row = cursor.fetchone() cursor.close() if row: return TriggerCondition.from_json(row[0], self.app) return None
[docs] def register_trigger(self, trigger: "TriggerDefinitionDTO") -> None: with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f"INSERT OR REPLACE INTO {self.tables.TRIGGERS} (trigger_id, task_id_key, logic_value, argument_provider_json) VALUES (?, ?, ?, ?)", ( trigger.trigger_id, trigger.task_id.key, trigger.logic.value, trigger.argument_provider_json, ), ) for condition_id in trigger.condition_ids: conn.execute( f"INSERT OR REPLACE INTO {self.tables.CONDITION_TRIGGERS} (condition_id, trigger_id) VALUES (?, ?)", (condition_id, trigger.trigger_id), ) conn.commit()
[docs] def _get_trigger(self, trigger_id: str) -> "TriggerDefinitionDTO | None": results = self._get_triggers([trigger_id]) return results[0] if results else None
[docs] def _get_triggers(self, trigger_ids: list[str]) -> list["TriggerDefinitionDTO"]: """Fetch multiple triggers in batch using two queries. Returns a list of TriggerDefinitionDTO for the provided trigger_ids preserving any that exist in the DB. """ if not trigger_ids: return [] trg_ids_query = ",".join(["?" for _ in trigger_ids]) with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT trigger_id, task_id_key, logic_value, argument_provider_json FROM {self.tables.TRIGGERS} WHERE trigger_id IN ({trg_ids_query})", tuple(trigger_ids), ) triggers_rows = cursor.fetchall() cursor.close() if not triggers_rows: return [] # Fetch condition ids for all triggers in one query cursor = conn.execute( f"SELECT condition_id, trigger_id FROM {self.tables.CONDITION_TRIGGERS} WHERE trigger_id IN ({trg_ids_query})", tuple(trigger_ids), ) condition_rows = cursor.fetchall() cursor.close() # Build mapping trigger_id -> list[condition_id] condition_map: dict[str, list[str]] = defaultdict(list) for condition_id, tid in condition_rows: condition_map[tid].append(condition_id) # Construct DTOs preserving only found triggers dto_list: list[TriggerDefinitionDTO] = [] for tid, task_id_key, logic_value, argument_provider_json in triggers_rows: dto_list.append( TriggerDefinitionDTO( trigger_id=tid, task_id=TaskId.from_key(task_id_key), condition_ids=condition_map[tid], logic=CompositeLogic(logic_value), argument_provider_json=argument_provider_json, ) ) return dto_list
[docs] def get_triggers_for_condition( self, condition_id: str ) -> list["TriggerDefinitionDTO"]: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT trigger_id FROM {self.tables.CONDITION_TRIGGERS} WHERE condition_id = ?", (condition_id,), ) trigger_ids = [row[0] for row in cursor.fetchall()] cursor.close() # Fetch all triggers in batch return self._get_triggers(trigger_ids)
# ------------------------------------------------------------------ # # Valid-condition storage # ------------------------------------------------------------------ # # Valid conditions are transient loop inputs. SQLite persists them so # multiple runners can coordinate trigger execution across processes.
[docs] def record_valid_condition(self, valid_condition: ValidCondition) -> None: with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f"INSERT OR REPLACE INTO {self.tables.VALID_CONDITIONS} (valid_condition_id, valid_condition_json) VALUES (?, ?)", (valid_condition.valid_condition_id, valid_condition.to_json(self.app)), ) conn.commit()
[docs] def record_valid_conditions(self, valid_conditions: list[ValidCondition]) -> None: with sqlite_conn(self.sqlite_db_path) as conn: for valid_condition in valid_conditions: conn.execute( f"INSERT OR REPLACE INTO {self.tables.VALID_CONDITIONS} (valid_condition_id, valid_condition_json) VALUES (?, ?)", ( valid_condition.valid_condition_id, valid_condition.to_json(self.app), ), ) conn.commit()
[docs] def get_valid_conditions(self) -> dict[str, ValidCondition]: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT valid_condition_id, valid_condition_json FROM {self.tables.VALID_CONDITIONS}" ) cursor_rows = cursor.fetchall() cursor.close() return { row[0]: ValidCondition.from_json(row[1], self.app) for row in cursor_rows }
[docs] def clear_valid_conditions(self, conditions: Iterable[ValidCondition]) -> None: with sqlite_conn(self.sqlite_db_path) as conn: for condition in conditions: conn.execute( f"DELETE FROM {self.tables.VALID_CONDITIONS} WHERE valid_condition_id = ?", (condition.valid_condition_id,), ) conn.commit()
[docs] def _get_all_conditions(self) -> list[TriggerCondition]: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT condition_json FROM {self.tables.CONDITIONS}" ) cursor_rows = cursor.fetchall() cursor.close() return [TriggerCondition.from_json(row[0], self.app) for row in cursor_rows]
# ------------------------------------------------------------------ # # Cron scheduling state # ------------------------------------------------------------------ # # The last-execution column doubles as an optimistic claim guard so cron # conditions do not fire twice when multiple runners poll at once.
[docs] def get_last_cron_execution(self, condition_id: str) -> datetime | None: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT last_cron_execution FROM {self.tables.CONDITIONS} WHERE condition_id = ?", (condition_id,), ) row = cursor.fetchone() cursor.close() if row and row[0]: return datetime.fromisoformat(row[0]) return None
[docs] def store_last_cron_execution( self, condition_id: str, execution_time: datetime, expected_last_execution: datetime | None = None, ) -> bool: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT last_cron_execution FROM {self.tables.CONDITIONS} WHERE condition_id = ?", (condition_id,), ) row = cursor.fetchone() cursor.close() current = datetime.fromisoformat(row[0]) if row and row[0] else None if ( expected_last_execution is not None and current != expected_last_execution ): return False conn.execute( f"UPDATE {self.tables.CONDITIONS} SET last_cron_execution = ? WHERE condition_id = ?", (execution_time.isoformat(), condition_id), ) conn.commit() return True
# ------------------------------------------------------------------ # # Source-task indexes and trigger-run claims # ------------------------------------------------------------------ # # Source-task indexes answer lifecycle-report queries; claim tables are # short-lived locks that protect trigger execution across processes.
[docs] def _register_source_task_condition( self, task_id: "TaskId", condition_id: str ) -> None: with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f"INSERT OR REPLACE INTO {self.tables.SOURCE_TASK_CONDITIONS} (task_id_key, condition_id) VALUES (?, ?)", (task_id.key, condition_id), ) conn.commit()
[docs] def get_conditions_sourced_from_task( self, task_id: "TaskId", context_type: type[ConditionContext] | None = None ) -> list[TriggerCondition]: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT condition_id FROM {self.tables.SOURCE_TASK_CONDITIONS} WHERE task_id_key = ?", (task_id.key,), ) condition_ids = [row[0] for row in cursor.fetchall()] cursor.close() conditions = [self.get_condition(cid) for cid in condition_ids] conditions = [c for c in conditions if c] if context_type is not None: conditions = [ cond for cond in conditions if cond.context_type == context_type ] return conditions
[docs] def claim_trigger_execution( self, trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60 ) -> bool: claim_key = f"{trigger_id}:{valid_condition_id}" now = datetime.now(UTC) expiration = now + timedelta(seconds=expiration_seconds) with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT expiration FROM {self.tables.EXECUTION_CLAIMS} WHERE claim_key = ?", (claim_key,), ) row = cursor.fetchone() cursor.close() if row and row[0]: existing_expiration = datetime.fromisoformat(row[0]) if existing_expiration > now: return False conn.execute( f"INSERT OR REPLACE INTO {self.tables.EXECUTION_CLAIMS} (claim_key, expiration) VALUES (?, ?)", (claim_key, expiration.isoformat()), ) conn.commit() return True
[docs] def claim_trigger_run( self, trigger_run_id: str, expiration_seconds: int = 60 ) -> bool: now = datetime.now(UTC) expiration = now + timedelta(seconds=expiration_seconds) with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT expiration FROM {self.tables.TRIGGER_RUN_CLAIMS} WHERE trigger_run_id = ?", (trigger_run_id,), ) row = cursor.fetchone() cursor.close() if row and row[0]: existing_expiration = datetime.fromisoformat(row[0]) if existing_expiration > now: return False conn.execute( f"INSERT OR REPLACE INTO {self.tables.TRIGGER_RUN_CLAIMS} (trigger_run_id, expiration) VALUES (?, ?)", (trigger_run_id, expiration.isoformat()), ) conn.commit() return True
# ------------------------------------------------------------------ # # Task trigger cleanup and full purge # ------------------------------------------------------------------ # # Task cleanup removes definitions for one task. Full purge drops all # trigger tables for this app prefix and recreates the schema.
[docs] def clean_task_trigger_definitions(self, task_id: "TaskId") -> None: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT trigger_id FROM {self.tables.TRIGGERS} WHERE task_id_key = ?", (task_id.key,), ) trigger_ids = [row[0] for row in cursor.fetchall()] cursor.close() for trigger_id in trigger_ids: conn.execute( f"DELETE FROM {self.tables.TRIGGERS} WHERE trigger_id = ?", (trigger_id,), ) conn.execute( f"DELETE FROM {self.tables.CONDITION_TRIGGERS} WHERE trigger_id = ?", (trigger_id,), ) conn.commit()
[docs] def _purge(self) -> None: delete_tables_with_prefix(self.sqlite_db_path, self.tables.table_prefix) self._init_tables()
# ------------------------------------------------------------------ # # Monitoring API # ------------------------------------------------------------------ # # Event records and event indexes. Event JSON is stored as the canonical # payload while relation tables provide indexed reads for Pynmon.
[docs] def store_event(self, event: EventRecord) -> None: """Upsert an event row plus its per-condition match links. Legacy callers may populate ``event.triggered_invocation_ids`` directly; in that case the relation table is also seeded so that the indexed read APIs see the same data. """ with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f""" INSERT OR REPLACE INTO {self.tables.EVENTS} ( event_id, event_code, event_timestamp, event_json, matched_condition_count, triggered_invocation_count, emitted_by_invocation_id, emitted_by_task_id, emitted_by_runner_context_id ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( event.event_id, event.event_code, event.timestamp.timestamp(), event.to_json(self.app), len(event.matched_condition_ids), len(event.triggered_invocation_ids), event.emitted_by_invocation_id, event.emitted_by_task_id, event.emitted_by_runner_context_id, ), ) conn.execute( f"DELETE FROM {self.tables.EVENT_CONDITIONS} WHERE event_id = ?", (event.event_id,), ) valid_ids = list(event.valid_condition_ids) for idx, cond_id in enumerate(event.matched_condition_ids): vc_id = valid_ids[idx] if idx < len(valid_ids) else None conn.execute( f""" INSERT OR REPLACE INTO {self.tables.EVENT_CONDITIONS} ( event_id, condition_id, valid_condition_id, matched ) VALUES (?, ?, ?, 1) """, (event.event_id, cond_id, vc_id), ) if event.triggered_invocation_ids: linked_at = datetime.now(UTC).timestamp() for inv_id in event.triggered_invocation_ids: conn.execute( f""" INSERT OR IGNORE INTO {self.tables.EVENT_TRIGGERED_INVOCATIONS} (event_id, invocation_id, trigger_run_id, linked_at) VALUES (?, ?, NULL, ?) """, (event.event_id, inv_id, linked_at), ) conn.commit()
[docs] def get_event(self, event_id: str) -> EventRecord | None: with sqlite_conn(self.sqlite_db_path) as conn: row = conn.execute( f"SELECT event_json FROM {self.tables.EVENTS} WHERE event_id = ?", (event_id,), ).fetchone() if not row: return None inv_rows = conn.execute( f"SELECT invocation_id FROM {self.tables.EVENT_TRIGGERED_INVOCATIONS} " f"WHERE event_id = ? ORDER BY linked_at", (event_id,), ).fetchall() record = EventRecord.from_json(row[0], self.app) record.triggered_invocation_ids = [r[0] for r in inv_rows] return record
[docs] def get_events( self, *, event_code: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, matched: bool | None = None, triggered: bool | None = None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None, limit: int = 100, offset: int = 0, ) -> list[EventRecord]: where, params = self._build_event_where( event_code, start_time, end_time, matched, triggered, emitted_by_invocation_id, emitted_by_task_id, ) sql = ( f"SELECT event_id, event_json FROM {self.tables.EVENTS} {where} " f"ORDER BY event_timestamp DESC LIMIT ? OFFSET ?" ) with sqlite_conn(self.sqlite_db_path) as conn: rows = conn.execute(sql, (*params, limit, offset)).fetchall() event_ids = [row[0] for row in rows] invocations = self._load_invocations_for_events(conn, event_ids) records = [] for row in rows: rec = EventRecord.from_json(row[1], self.app) rec.triggered_invocation_ids = invocations.get(rec.event_id, []) records.append(rec) return records
[docs] def count_events( self, *, event_code: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, matched: bool | None = None, triggered: bool | None = None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None, ) -> int: where, params = self._build_event_where( event_code, start_time, end_time, matched, triggered, emitted_by_invocation_id, emitted_by_task_id, ) with sqlite_conn(self.sqlite_db_path) as conn: row = conn.execute( f"SELECT COUNT(*) FROM {self.tables.EVENTS} {where}", params ).fetchone() return int(row[0]) if row else 0
[docs] def get_events_emitted_by_invocation( self, invocation_id: str, *, limit: int = 200, offset: int = 0, ) -> list[EventRecord]: """Override: direct indexed SELECT on ``emitted_by_invocation_id``.""" return self.get_events( emitted_by_invocation_id=invocation_id, limit=limit, offset=offset, )
[docs] def get_event_markers_in_timerange( self, start_time: datetime, end_time: datetime, *, event_code: str | None = None, state: str = "all", limit: int = 1000, offset: int = 0, ) -> EventMarkerPage: clauses = ["event_timestamp >= ?", "event_timestamp <= ?"] params: list = [start_time.timestamp(), end_time.timestamp()] if event_code is not None: clauses.append("event_code = ?") params.append(event_code) if state == "matched": clauses.append("matched_condition_count > 0") elif state == "unmatched": clauses.append("matched_condition_count = 0") elif state == "triggered": clauses.append("triggered_invocation_count > 0") elif state == "untriggered": clauses.append("triggered_invocation_count = 0") where = " AND ".join(clauses) with sqlite_conn(self.sqlite_db_path) as conn: total_row = conn.execute( f"SELECT COUNT(*) FROM {self.tables.EVENTS} WHERE {where}", tuple(params), ).fetchone() total = int(total_row[0]) if total_row else 0 rows = conn.execute( f""" SELECT event_id, event_code, event_timestamp, matched_condition_count, triggered_invocation_count, emitted_by_invocation_id, emitted_by_runner_context_id FROM {self.tables.EVENTS} WHERE {where} ORDER BY event_timestamp LIMIT ? OFFSET ? """, (*params, limit, offset), ).fetchall() markers = [ EventMarker( event_id=row[0], event_code=row[1], timestamp=datetime.fromtimestamp(row[2], tz=UTC), matched=row[3] > 0, triggered=row[4] > 0, emitted_by_invocation_id=row[5], emitted_by_runner_context_id=row[6], ) for row in rows ] return EventMarkerPage( markers=markers, total=total, truncated=offset + len(markers) < total, )
[docs] def get_invocations_triggered_by_event(self, event_id: str) -> list[str]: with sqlite_conn(self.sqlite_db_path) as conn: rows = conn.execute( f"SELECT invocation_id FROM {self.tables.EVENT_TRIGGERED_INVOCATIONS} " f"WHERE event_id = ? ORDER BY linked_at", (event_id,), ).fetchall() return [row[0] for row in rows]
[docs] def _load_invocations_for_events( self, conn: "SQLiteConnection", event_ids: list[str] ) -> dict[str, list[str]]: """Bulk-load triggered invocation ids for a batch of event ids.""" if not event_ids: return {} out: dict[str, list[str]] = {eid: [] for eid in event_ids} batch = 500 for i in range(0, len(event_ids), batch): chunk = event_ids[i : i + batch] placeholders = ",".join("?" * len(chunk)) rows = conn.execute( f"SELECT event_id, invocation_id " f"FROM {self.tables.EVENT_TRIGGERED_INVOCATIONS} " f"WHERE event_id IN ({placeholders}) ORDER BY linked_at", tuple(chunk), ).fetchall() for event_id, invocation_id in rows: out.setdefault(event_id, []).append(invocation_id) return out
[docs] def list_event_codes(self) -> list[str]: with sqlite_conn(self.sqlite_db_path) as conn: rows = conn.execute( f"SELECT DISTINCT event_code FROM {self.tables.EVENTS} " f"ORDER BY event_code" ).fetchall() return [row[0] for row in rows]
[docs] def _build_event_where( self, event_code: str | None, start_time: datetime | None, end_time: datetime | None, matched: bool | None, triggered: bool | None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None, ) -> tuple[str, tuple]: """Build a parameterized WHERE clause for event filters.""" clauses: list[str] = [] params: list = [] if event_code is not None: clauses.append("event_code = ?") params.append(event_code) if start_time is not None: clauses.append("event_timestamp >= ?") params.append(start_time.timestamp()) if end_time is not None: clauses.append("event_timestamp <= ?") params.append(end_time.timestamp()) if matched is True: clauses.append("matched_condition_count > 0") elif matched is False: clauses.append("matched_condition_count = 0") if triggered is True: clauses.append("triggered_invocation_count > 0") elif triggered is False: clauses.append("triggered_invocation_count = 0") if emitted_by_invocation_id is not None: clauses.append("emitted_by_invocation_id = ?") params.append(emitted_by_invocation_id) if emitted_by_task_id is not None: clauses.append("emitted_by_task_id = ?") params.append(emitted_by_task_id) where = f"WHERE {' AND '.join(clauses)}" if clauses else "" return where, tuple(params)
# Trigger-run records and participant indexes. Participant rows keep the # event/source-invocation/valid-condition relations queryable without # deserializing every trigger-run JSON blob.
[docs] def store_trigger_run(self, run: TriggerRunRecord) -> None: """Upsert a trigger run plus its per-condition link rows.""" claimed_ts = run.claimed_at.timestamp() if run.claimed_at else None executed_ts = run.executed_at.timestamp() if run.executed_at else None with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f""" INSERT OR REPLACE INTO {self.tables.TRIGGER_RUNS} ( trigger_run_id, trigger_id, task_id_key, logic_value, claimed_at, executed_at, triggered_invocation_id, trigger_run_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( run.trigger_run_id, run.trigger_id, run.task_id_key, run.logic_value, claimed_ts, executed_ts, run.triggered_invocation_id, run.to_json(), ), ) conn.execute( f"DELETE FROM {self.tables.TRIGGER_RUN_CONDITIONS} " f"WHERE trigger_run_id = ?", (run.trigger_run_id,), ) self._insert_trigger_run_conditions(conn, run) conn.commit()
[docs] def _insert_trigger_run_conditions( self, conn: "SQLiteConnection", run: TriggerRunRecord ) -> None: """Write one row per participant (or per id when participants are absent).""" sql = ( f"INSERT INTO {self.tables.TRIGGER_RUN_CONDITIONS} (" "trigger_run_id, context_type, condition_id, " "valid_condition_id, event_id, source_invocation_id" ") VALUES (?, ?, ?, ?, ?, ?)" ) if run.participants: for p in run.participants: conn.execute( sql, ( run.trigger_run_id, p.context_type, p.condition_id, p.valid_condition_id, p.event_id, p.source_invocation_id, ), ) return for event_id in run.event_ids: conn.execute(sql, (run.trigger_run_id, None, None, None, event_id, None)) for invocation_id in run.source_invocation_ids: conn.execute( sql, (run.trigger_run_id, None, None, None, None, invocation_id) )
[docs] def get_trigger_run(self, trigger_run_id: str) -> TriggerRunRecord | None: with sqlite_conn(self.sqlite_db_path) as conn: row = conn.execute( f"SELECT trigger_run_json FROM {self.tables.TRIGGER_RUNS} " f"WHERE trigger_run_id = ?", (trigger_run_id,), ).fetchone() if not row: return None return TriggerRunRecord.from_json(row[0])
[docs] def get_trigger_runs_for_event(self, event_id: str) -> list[TriggerRunRecord]: with sqlite_conn(self.sqlite_db_path) as conn: rows = conn.execute( f""" SELECT DISTINCT t.trigger_run_json FROM {self.tables.TRIGGER_RUNS} t JOIN {self.tables.TRIGGER_RUN_CONDITIONS} c ON c.trigger_run_id = t.trigger_run_id WHERE c.event_id = ? ORDER BY t.executed_at """, (event_id,), ).fetchall() return [TriggerRunRecord.from_json(row[0]) for row in rows]
[docs] def get_trigger_runs_for_invocation( self, invocation_id: str ) -> list[TriggerRunRecord]: with sqlite_conn(self.sqlite_db_path) as conn: rows = conn.execute( f"SELECT trigger_run_json FROM {self.tables.TRIGGER_RUNS} " f"WHERE triggered_invocation_id = ?", (invocation_id,), ).fetchall() return [TriggerRunRecord.from_json(row[0]) for row in rows]
[docs] def get_trigger_runs_sourced_by_invocation( self, invocation_id: str ) -> list[TriggerRunRecord]: with sqlite_conn(self.sqlite_db_path) as conn: rows = conn.execute( f""" SELECT DISTINCT t.trigger_run_json FROM {self.tables.TRIGGER_RUNS} t JOIN {self.tables.TRIGGER_RUN_CONDITIONS} c ON c.trigger_run_id = t.trigger_run_id WHERE c.source_invocation_id = ? ORDER BY t.executed_at """, (invocation_id,), ).fetchall() return [TriggerRunRecord.from_json(row[0]) for row in rows]
[docs] def get_trigger_runs_for_valid_condition( self, valid_condition_id: str ) -> list[TriggerRunRecord]: with sqlite_conn(self.sqlite_db_path) as conn: rows = conn.execute( f""" SELECT DISTINCT t.trigger_run_json FROM {self.tables.TRIGGER_RUNS} t JOIN {self.tables.TRIGGER_RUN_CONDITIONS} c ON c.trigger_run_id = t.trigger_run_id WHERE c.valid_condition_id = ? ORDER BY t.executed_at """, (valid_condition_id,), ).fetchall() return [TriggerRunRecord.from_json(row[0]) for row in rows]
[docs] def get_trigger_runs_in_timerange( self, start_time: datetime, end_time: datetime, *, event_code: str | None = None, task_id_key: str | None = None, limit: int | None = None, ) -> list[TriggerRunRecord]: clauses = ["t.executed_at IS NOT NULL"] params: list = [] clauses.append("t.executed_at >= ? AND t.executed_at <= ?") params.extend([start_time.timestamp(), end_time.timestamp()]) join = "" if task_id_key is not None: clauses.append("t.task_id_key = ?") params.append(task_id_key) if event_code is not None: join = ( f" JOIN {self.tables.TRIGGER_RUN_CONDITIONS} c " f"ON c.trigger_run_id = t.trigger_run_id " f"JOIN {self.tables.EVENTS} e ON e.event_id = c.event_id" ) clauses.append("e.event_code = ?") params.append(event_code) sql = ( f"SELECT DISTINCT t.trigger_run_json FROM {self.tables.TRIGGER_RUNS} t" f"{join} WHERE {' AND '.join(clauses)} ORDER BY t.executed_at" ) if limit is not None: sql += " LIMIT ?" params.append(limit) with sqlite_conn(self.sqlite_db_path) as conn: rows = conn.execute(sql, tuple(params)).fetchall() return [TriggerRunRecord.from_json(row[0]) for row in rows]
# ------------------------------------------------------------------ # # Monitoring retention # ------------------------------------------------------------------ # # Retention applies age and capacity limits to events and trigger runs, # cascading event deletes into trigger-run tables to avoid stale edges. # The driving algorithm lives in BaseTrigger._auto_purge_events; this # class supplies the SQLite primitives.
[docs] def _age_purge_events(self, threshold: datetime) -> list[str]: ts = threshold.timestamp() with sqlite_conn(self.sqlite_db_path) as conn: removed = self._purge_events_age(conn, ts) conn.commit() return removed
[docs] def _age_purge_trigger_runs(self, threshold: datetime) -> int: ts = threshold.timestamp() with sqlite_conn(self.sqlite_db_path) as conn: count = self._purge_trigger_runs_age(conn, ts) conn.commit() return count
[docs] def _cap_purge_events(self) -> list[str]: with sqlite_conn(self.sqlite_db_path) as conn: removed = self._enforce_event_capacity(conn) conn.commit() return removed
[docs] def _cap_purge_trigger_runs(self) -> int: with sqlite_conn(self.sqlite_db_path) as conn: count = self._enforce_trigger_run_capacity(conn) conn.commit() return count
[docs] def _cascade_delete_runs_for_events(self, event_ids: list[str]) -> int: with sqlite_conn(self.sqlite_db_path) as conn: count = self._cascade_delete_runs_for_events_in_conn(conn, event_ids) conn.commit() return count
[docs] def _cascade_delete_runs_for_events_in_conn( self, conn: "SQLiteConnection", event_ids: list[str] ) -> int: """Delete trigger runs that reference any of the given event ids.""" unique_ids = list(dict.fromkeys(event_ids)) run_ids: list[str] = [] batch = 500 for i in range(0, len(unique_ids), batch): chunk = unique_ids[i : i + batch] placeholders = ",".join("?" * len(chunk)) rows = conn.execute( f"SELECT DISTINCT trigger_run_id " f"FROM {self.tables.TRIGGER_RUN_CONDITIONS} " f"WHERE event_id IN ({placeholders})", tuple(chunk), ).fetchall() run_ids.extend(row[0] for row in rows) if not run_ids: return 0 _delete_by_ids( conn, self.tables.TRIGGER_RUN_CONDITIONS, "trigger_run_id", run_ids ) _delete_by_ids(conn, self.tables.TRIGGER_RUNS, "trigger_run_id", run_ids) return len(run_ids)
[docs] def _purge_events_age( self, conn: "SQLiteConnection", threshold: float ) -> list[str]: """Delete events older than ``threshold``; return removed event ids.""" ids = [ row[0] for row in conn.execute( f"SELECT event_id FROM {self.tables.EVENTS} WHERE event_timestamp < ?", (threshold,), ).fetchall() ] if not ids: return [] _delete_by_ids(conn, self.tables.EVENT_CONDITIONS, "event_id", ids) _delete_by_ids(conn, self.tables.EVENT_TRIGGERED_INVOCATIONS, "event_id", ids) _delete_by_ids(conn, self.tables.EVENTS, "event_id", ids) return ids
[docs] def _purge_trigger_runs_age( self, conn: "SQLiteConnection", threshold: float ) -> int: """Delete trigger runs whose ``executed_at`` is older than ``threshold``.""" ids = [ row[0] for row in conn.execute( f"SELECT trigger_run_id FROM {self.tables.TRIGGER_RUNS} " f"WHERE executed_at IS NOT NULL AND executed_at < ?", (threshold,), ).fetchall() ] if not ids: return 0 _delete_by_ids(conn, self.tables.TRIGGER_RUN_CONDITIONS, "trigger_run_id", ids) _delete_by_ids(conn, self.tables.TRIGGER_RUNS, "trigger_run_id", ids) return len(ids)
[docs] def _enforce_event_capacity(self, conn: "SQLiteConnection") -> list[str]: """Drop oldest events beyond ``event_max_records``; return removed ids.""" cap = self.conf.event_max_records if cap <= 0: return [] ids = [ row[0] for row in conn.execute( f"SELECT event_id FROM {self.tables.EVENTS} " f"ORDER BY event_timestamp DESC LIMIT -1 OFFSET ?", (cap,), ).fetchall() ] if not ids: return [] _delete_by_ids(conn, self.tables.EVENT_CONDITIONS, "event_id", ids) _delete_by_ids(conn, self.tables.EVENT_TRIGGERED_INVOCATIONS, "event_id", ids) _delete_by_ids(conn, self.tables.EVENTS, "event_id", ids) return ids
[docs] def _enforce_trigger_run_capacity(self, conn: "SQLiteConnection") -> int: """Drop oldest trigger runs beyond ``trigger_run_max_records``.""" cap = self.conf.trigger_run_max_records if cap <= 0: return 0 ids = [ row[0] for row in conn.execute( f"SELECT trigger_run_id FROM {self.tables.TRIGGER_RUNS} " f"ORDER BY COALESCE(executed_at, claimed_at, 0) DESC " f"LIMIT -1 OFFSET ?", (cap,), ).fetchall() ] if not ids: return 0 _delete_by_ids(conn, self.tables.TRIGGER_RUN_CONDITIONS, "trigger_run_id", ids) _delete_by_ids(conn, self.tables.TRIGGER_RUNS, "trigger_run_id", ids) return len(ids)
# ---------------------------------------------------------------------- # # SQLite helper functions # ---------------------------------------------------------------------- # # Small stateless helpers stay outside SQLiteTrigger so table-specific methods # can share batching behavior without adding inheritance layers.
[docs] def _delete_by_ids( conn: "SQLiteConnection", table: str, column: str, ids: list[str] ) -> None: """Delete rows from ``table`` where ``column`` is in ``ids`` in batches.""" batch = 500 for i in range(0, len(ids), batch): chunk = ids[i : i + batch] placeholders = ",".join("?" * len(chunk)) conn.execute( f"DELETE FROM {table} WHERE {column} IN ({placeholders})", tuple(chunk) )