Source code for pynenc.trigger.sqlite_trigger

"""
SQLite-based implementation of the Pynenc trigger subsystem.

This module provides a trigger system implementation that stores
all its state in a SQLite database. Suitable for cross-process coordination and testing.
"""

from collections.abc import Iterable
from collections import defaultdict
from datetime import UTC, datetime, timedelta
from functools import cached_property
from typing import TYPE_CHECKING, Optional

from pynenc.conf.config_trigger import ConfigTriggerSQLite
from pynenc.identifiers.task_id import TaskId
from pynenc.models.trigger_definition_dto import TriggerDefinitionDTO
from pynenc.trigger.base_trigger import BaseTrigger
from pynenc.trigger.conditions import (
    ConditionContext,
    TriggerCondition,
    ValidCondition,
    CompositeLogic,
)
from pynenc.util.sqlite_utils import create_sqlite_connection as sqlite_conn
from pynenc.util.sqlite_utils import delete_tables_with_prefix

if TYPE_CHECKING:
    from pynenc.app import Pynenc


[docs] class Tables: CONDITIONS = "trg_conditions" TRIGGERS = "trg_triggers" CONDITION_TRIGGERS = "trg_condition_triggers" VALID_CONDITIONS = "trg_valid_conditions" SOURCE_TASK_CONDITIONS = "trg_source_task_conditions" EXECUTION_CLAIMS = "trg_execution_claims" TRIGGER_RUN_CLAIMS = "trg_trigger_run_claims"
[docs] class SQLiteTrigger(BaseTrigger): """ SQLite-based implementation of the Pynenc trigger system. Stores all trigger, condition, and claim data in a SQLite database for cross-process safety. """ def __init__(self, app: "Pynenc") -> None: super().__init__(app) self.sqlite_db_path = self.conf.sqlite_db_path self._init_tables() @cached_property def conf(self) -> ConfigTriggerSQLite: return ConfigTriggerSQLite( config_values=self.app.config_values, config_filepath=self.app.config_filepath, )
[docs] def _init_tables(self) -> None: """Initialize SQLite tables for trigger state.""" with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f""" CREATE TABLE IF NOT EXISTS {Tables.CONDITIONS} ( condition_id TEXT PRIMARY KEY, condition_json TEXT NOT NULL, last_cron_execution TIMESTAMP ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {Tables.TRIGGERS} ( trigger_id TEXT PRIMARY KEY, task_id_key TEXT NOT NULL, logic_value TEXT NOT NULL, argument_provider_json TEXT ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {Tables.CONDITION_TRIGGERS} ( condition_id TEXT NOT NULL, trigger_id TEXT NOT NULL, PRIMARY KEY (condition_id, trigger_id) ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {Tables.VALID_CONDITIONS} ( valid_condition_id TEXT PRIMARY KEY, valid_condition_json TEXT NOT NULL ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {Tables.SOURCE_TASK_CONDITIONS} ( task_id_key TEXT NOT NULL, condition_id TEXT NOT NULL, PRIMARY KEY (task_id_key, condition_id) ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {Tables.EXECUTION_CLAIMS} ( claim_key TEXT PRIMARY KEY, expiration TIMESTAMP ) """ ) conn.execute( f""" CREATE TABLE IF NOT EXISTS {Tables.TRIGGER_RUN_CLAIMS} ( trigger_run_id TEXT PRIMARY KEY, expiration TIMESTAMP ) """ ) conn.commit()
[docs] def _register_condition(self, condition: TriggerCondition) -> None: with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f"INSERT OR REPLACE INTO {Tables.CONDITIONS} (condition_id, condition_json) VALUES (?, ?)", (condition.condition_id, condition.to_json(self.app)), ) conn.commit()
[docs] def get_condition(self, condition_id: str) -> TriggerCondition | None: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT condition_json FROM {Tables.CONDITIONS} WHERE condition_id = ?", (condition_id,), ) row = cursor.fetchone() cursor.close() if row: return TriggerCondition.from_json(row[0], self.app) return None
[docs] def register_trigger(self, trigger: "TriggerDefinitionDTO") -> None: with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f"INSERT OR REPLACE INTO {Tables.TRIGGERS} (trigger_id, task_id_key, logic_value, argument_provider_json) VALUES (?, ?, ?, ?)", ( trigger.trigger_id, trigger.task_id.key, trigger.logic.value, trigger.argument_provider_json, ), ) for condition_id in trigger.condition_ids: conn.execute( f"INSERT OR REPLACE INTO {Tables.CONDITION_TRIGGERS} (condition_id, trigger_id) VALUES (?, ?)", (condition_id, trigger.trigger_id), ) conn.commit()
[docs] def _get_trigger(self, trigger_id: str) -> Optional["TriggerDefinitionDTO"]: results = self._get_triggers([trigger_id]) return results[0] if results else None
[docs] def _get_triggers(self, trigger_ids: list[str]) -> list["TriggerDefinitionDTO"]: """Fetch multiple triggers in batch using two queries. Returns a list of TriggerDefinitionDTO for the provided trigger_ids preserving any that exist in the DB. """ if not trigger_ids: return [] trg_ids_query = ",".join(["?" for _ in trigger_ids]) with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT trigger_id, task_id_key, logic_value, argument_provider_json FROM {Tables.TRIGGERS} WHERE trigger_id IN ({trg_ids_query})", tuple(trigger_ids), ) triggers_rows = cursor.fetchall() cursor.close() if not triggers_rows: return [] # Fetch condition ids for all triggers in one query cursor = conn.execute( f"SELECT condition_id, trigger_id FROM {Tables.CONDITION_TRIGGERS} WHERE trigger_id IN ({trg_ids_query})", tuple(trigger_ids), ) condition_rows = cursor.fetchall() cursor.close() # Build mapping trigger_id -> list[condition_id] condition_map: dict[str, list[str]] = defaultdict(list) for condition_id, tid in condition_rows: condition_map[tid].append(condition_id) # Construct DTOs preserving only found triggers dto_list: list[TriggerDefinitionDTO] = [] for tid, task_id_key, logic_value, argument_provider_json in triggers_rows: dto_list.append( TriggerDefinitionDTO( trigger_id=tid, task_id=TaskId.from_key(task_id_key), condition_ids=condition_map[tid], logic=CompositeLogic(logic_value), argument_provider_json=argument_provider_json, ) ) return dto_list
[docs] def get_triggers_for_condition( self, condition_id: str ) -> list["TriggerDefinitionDTO"]: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT trigger_id FROM {Tables.CONDITION_TRIGGERS} WHERE condition_id = ?", (condition_id,), ) trigger_ids = [row[0] for row in cursor.fetchall()] cursor.close() # Fetch all triggers in batch return self._get_triggers(trigger_ids)
[docs] def record_valid_condition(self, valid_condition: ValidCondition) -> None: with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f"INSERT OR REPLACE INTO {Tables.VALID_CONDITIONS} (valid_condition_id, valid_condition_json) VALUES (?, ?)", (valid_condition.valid_condition_id, valid_condition.to_json(self.app)), ) conn.commit()
[docs] def record_valid_conditions(self, valid_conditions: list[ValidCondition]) -> None: with sqlite_conn(self.sqlite_db_path) as conn: for valid_condition in valid_conditions: conn.execute( f"INSERT OR REPLACE INTO {Tables.VALID_CONDITIONS} (valid_condition_id, valid_condition_json) VALUES (?, ?)", ( valid_condition.valid_condition_id, valid_condition.to_json(self.app), ), ) conn.commit()
[docs] def get_valid_conditions(self) -> dict[str, ValidCondition]: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT valid_condition_id, valid_condition_json FROM {Tables.VALID_CONDITIONS}" ) cursor_rows = cursor.fetchall() cursor.close() return { row[0]: ValidCondition.from_json(row[1], self.app) for row in cursor_rows }
[docs] def clear_valid_conditions(self, conditions: Iterable[ValidCondition]) -> None: with sqlite_conn(self.sqlite_db_path) as conn: for condition in conditions: conn.execute( f"DELETE FROM {Tables.VALID_CONDITIONS} WHERE valid_condition_id = ?", (condition.valid_condition_id,), ) conn.commit()
[docs] def _get_all_conditions(self) -> list[TriggerCondition]: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute(f"SELECT condition_json FROM {Tables.CONDITIONS}") cursor_rows = cursor.fetchall() cursor.close() return [TriggerCondition.from_json(row[0], self.app) for row in cursor_rows]
[docs] def get_last_cron_execution(self, condition_id: str) -> datetime | None: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT last_cron_execution FROM {Tables.CONDITIONS} WHERE condition_id = ?", (condition_id,), ) row = cursor.fetchone() cursor.close() if row and row[0]: return datetime.fromisoformat(row[0]) return None
[docs] def store_last_cron_execution( self, condition_id: str, execution_time: datetime, expected_last_execution: datetime | None = None, ) -> bool: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT last_cron_execution FROM {Tables.CONDITIONS} WHERE condition_id = ?", (condition_id,), ) row = cursor.fetchone() cursor.close() current = datetime.fromisoformat(row[0]) if row and row[0] else None if ( expected_last_execution is not None and current != expected_last_execution ): return False conn.execute( f"UPDATE {Tables.CONDITIONS} SET last_cron_execution = ? WHERE condition_id = ?", (execution_time.isoformat(), condition_id), ) conn.commit() return True
[docs] def _register_source_task_condition( self, task_id: "TaskId", condition_id: str ) -> None: with sqlite_conn(self.sqlite_db_path) as conn: conn.execute( f"INSERT OR REPLACE INTO {Tables.SOURCE_TASK_CONDITIONS} (task_id_key, condition_id) VALUES (?, ?)", (task_id.key, condition_id), ) conn.commit()
[docs] def get_conditions_sourced_from_task( self, task_id: "TaskId", context_type: type[ConditionContext] | None = None ) -> list[TriggerCondition]: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT condition_id FROM {Tables.SOURCE_TASK_CONDITIONS} WHERE task_id_key = ?", (task_id.key,), ) condition_ids = [row[0] for row in cursor.fetchall()] cursor.close() conditions = [self.get_condition(cid) for cid in condition_ids] conditions = [c for c in conditions if c] if context_type is not None: conditions = [ cond for cond in conditions if cond.context_type == context_type ] return conditions
[docs] def claim_trigger_execution( self, trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60 ) -> bool: claim_key = f"{trigger_id}:{valid_condition_id}" now = datetime.now(UTC) expiration = now + timedelta(seconds=expiration_seconds) with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT expiration FROM {Tables.EXECUTION_CLAIMS} WHERE claim_key = ?", (claim_key,), ) row = cursor.fetchone() cursor.close() if row and row[0]: existing_expiration = datetime.fromisoformat(row[0]) if existing_expiration > now: return False conn.execute( f"INSERT OR REPLACE INTO {Tables.EXECUTION_CLAIMS} (claim_key, expiration) VALUES (?, ?)", (claim_key, expiration.isoformat()), ) conn.commit() return True
[docs] def claim_trigger_run( self, trigger_run_id: str, expiration_seconds: int = 60 ) -> bool: now = datetime.now(UTC) expiration = now + timedelta(seconds=expiration_seconds) with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT expiration FROM {Tables.TRIGGER_RUN_CLAIMS} WHERE trigger_run_id = ?", (trigger_run_id,), ) row = cursor.fetchone() cursor.close() if row and row[0]: existing_expiration = datetime.fromisoformat(row[0]) if existing_expiration > now: return False conn.execute( f"INSERT OR REPLACE INTO {Tables.TRIGGER_RUN_CLAIMS} (trigger_run_id, expiration) VALUES (?, ?)", (trigger_run_id, expiration.isoformat()), ) conn.commit() return True
[docs] def clean_task_trigger_definitions(self, task_id: "TaskId") -> None: with sqlite_conn(self.sqlite_db_path) as conn: cursor = conn.execute( f"SELECT trigger_id FROM {Tables.TRIGGERS} WHERE task_id_key = ?", (task_id.key,), ) trigger_ids = [row[0] for row in cursor.fetchall()] cursor.close() for trigger_id in trigger_ids: conn.execute( f"DELETE FROM {Tables.TRIGGERS} WHERE trigger_id = ?", (trigger_id,) ) conn.execute( f"DELETE FROM {Tables.CONDITION_TRIGGERS} WHERE trigger_id = ?", (trigger_id,), ) conn.commit()
[docs] def _purge(self) -> None: delete_tables_with_prefix(self.sqlite_db_path, "trg_") self._init_tables()