pynenc.trigger.sqlite_trigger¶
SQLite-based implementation of the Pynenc trigger subsystem.
This module provides a trigger system implementation that stores all its state in a SQLite database. Suitable for cross-process coordination and testing.
Module Contents¶
Classes¶
Table names for trigger, scoped by app_id. |
|
SQLite-based implementation of the Pynenc trigger system. |
Functions¶
Delete rows from |
API¶
- class pynenc.trigger.sqlite_trigger.Tables(app_id: str)[source]¶
Bases:
pynenc.util.sqlite_utils.TableNamesTable names for trigger, scoped by app_id.
Initialization
- class pynenc.trigger.sqlite_trigger.SQLiteTrigger(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.trigger.base_trigger.BaseTriggerSQLite-based implementation of the Pynenc trigger system.
Stores all trigger, condition, and claim data in a SQLite database for cross-process safety.
Initialization
Initialize the trigger component with the parent application.
- Parameters:
app – The Pynenc application instance
- _ensure_schema_version(conn: pynenc.util.sqlite_utils.SQLiteConnection) None[source]¶
Create the schema-version table and record the current version.
Used to detect future column or index migrations. A missing version means the schema predates this introduction;
store_eventand related operations remain backward compatible at version 1.
- _init_monitoring_tables(conn: pynenc.util.sqlite_utils.SQLiteConnection) None[source]¶
Create event/trigger-run monitoring tables and indexes.
- _init_event_tables(conn: pynenc.util.sqlite_utils.SQLiteConnection) None[source]¶
Create the events table plus its core indexes.
- _init_event_invocation_index(conn: pynenc.util.sqlite_utils.SQLiteConnection) None[source]¶
Create the relation table linking events to triggered invocations.
- _init_trigger_run_tables(conn: pynenc.util.sqlite_utils.SQLiteConnection) None[source]¶
Create trigger-run tables, indexes, and participant rows.
- register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]¶
- _get_trigger(trigger_id: str) TriggerDefinitionDTO | None[source]¶
- _get_triggers(trigger_ids: list[str]) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
Fetch multiple triggers in batch using two queries.
Returns a list of TriggerDefinitionDTO for the provided trigger_ids preserving any that exist in the DB.
- get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
- record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]¶
- clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]¶
- get_last_cron_execution(condition_id: str) datetime.datetime | None[source]¶
- store_last_cron_execution(condition_id: str, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]¶
- _register_source_task_condition(task_id: pynenc.identifiers.task_id.TaskId, condition_id: str) None[source]¶
- get_conditions_sourced_from_task(task_id: pynenc.identifiers.task_id.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]¶
- claim_trigger_execution(trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60) bool[source]¶
- clean_task_trigger_definitions(task_id: pynenc.identifiers.task_id.TaskId) None[source]¶
- store_event(event: pynenc.trigger.monitoring.EventRecord) None[source]¶
Upsert an event row plus its per-condition match links.
Legacy callers may populate
event.triggered_invocation_idsdirectly; in that case the relation table is also seeded so that the indexed read APIs see the same data.
- get_event(event_id: str) pynenc.trigger.monitoring.EventRecord | None[source]¶
- get_events(*, event_code: str | None = None, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, matched: bool | None = None, triggered: bool | None = None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None, limit: int = 100, offset: int = 0) list[pynenc.trigger.monitoring.EventRecord][source]¶
- count_events(*, event_code: str | None = None, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, matched: bool | None = None, triggered: bool | None = None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None) int[source]¶
- get_events_emitted_by_invocation(invocation_id: str, *, limit: int = 200, offset: int = 0) list[pynenc.trigger.monitoring.EventRecord][source]¶
Override: direct indexed SELECT on
emitted_by_invocation_id.
- get_event_markers_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, *, event_code: str | None = None, state: str = 'all', limit: int = 1000, offset: int = 0) pynenc.trigger.monitoring.EventMarkerPage[source]¶
- link_trigger_run_to_events(event_ids: list[str], invocation_id: str, *, trigger_run_id: str) None[source]¶
- _load_invocations_for_events(conn: pynenc.util.sqlite_utils.SQLiteConnection, event_ids: list[str]) dict[str, list[str]][source]¶
Bulk-load triggered invocation ids for a batch of event ids.
- _build_event_where(event_code: str | None, start_time: datetime.datetime | None, end_time: datetime.datetime | None, matched: bool | None, triggered: bool | None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None) tuple[str, tuple][source]¶
Build a parameterized WHERE clause for event filters.
- store_trigger_run(run: pynenc.trigger.monitoring.TriggerRunRecord) None[source]¶
Upsert a trigger run plus its per-condition link rows.
- _insert_trigger_run_conditions(conn: pynenc.util.sqlite_utils.SQLiteConnection, run: pynenc.trigger.monitoring.TriggerRunRecord) None[source]¶
Write one row per participant (or per id when participants are absent).
- get_trigger_run(trigger_run_id: str) pynenc.trigger.monitoring.TriggerRunRecord | None[source]¶
- get_trigger_runs_for_event(event_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]¶
- get_trigger_runs_for_invocation(invocation_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]¶
- get_trigger_runs_sourced_by_invocation(invocation_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]¶
- get_trigger_runs_for_valid_condition(valid_condition_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]¶
- get_trigger_runs_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, *, event_code: str | None = None, task_id_key: str | None = None, limit: int | None = None) list[pynenc.trigger.monitoring.TriggerRunRecord][source]¶
- _age_purge_events(threshold: datetime.datetime) list[str][source]¶
- _age_purge_trigger_runs(threshold: datetime.datetime) int[source]¶
- _cascade_delete_runs_for_events_in_conn(conn: pynenc.util.sqlite_utils.SQLiteConnection, event_ids: list[str]) int[source]¶
Delete trigger runs that reference any of the given event ids.
- _purge_events_age(conn: pynenc.util.sqlite_utils.SQLiteConnection, threshold: float) list[str][source]¶
Delete events older than
threshold; return removed event ids.
- _purge_trigger_runs_age(conn: pynenc.util.sqlite_utils.SQLiteConnection, threshold: float) int[source]¶
Delete trigger runs whose
executed_atis older thanthreshold.
- _enforce_event_capacity(conn: pynenc.util.sqlite_utils.SQLiteConnection) list[str][source]¶
Drop oldest events beyond
event_max_records; return removed ids.
- _enforce_trigger_run_capacity(conn: pynenc.util.sqlite_utils.SQLiteConnection) int[source]¶
Drop oldest trigger runs beyond
trigger_run_max_records.