pynenc.trigger.sqlite_trigger

SQLite-based implementation of the Pynenc trigger subsystem.

This module provides a trigger system implementation that stores all its state in a SQLite database. Suitable for cross-process coordination and testing.

Module Contents

Classes

Tables

Table names for trigger, scoped by app_id.

SQLiteTrigger

SQLite-based implementation of the Pynenc trigger system.

Functions

_delete_by_ids

Delete rows from table where column is in ids in batches.

API

class pynenc.trigger.sqlite_trigger.Tables(app_id: str)[source]

Bases: pynenc.util.sqlite_utils.TableNames

Table names for trigger, scoped by app_id.

Initialization

class pynenc.trigger.sqlite_trigger.SQLiteTrigger(app: pynenc.app.Pynenc)[source]

Bases: pynenc.trigger.base_trigger.BaseTrigger

SQLite-based implementation of the Pynenc trigger system.

Stores all trigger, condition, and claim data in a SQLite database for cross-process safety.

Initialization

Initialize the trigger component with the parent application.

Parameters:

app – The Pynenc application instance

conf() pynenc.conf.config_trigger.ConfigTriggerSQLite
_init_tables() None[source]

Initialize SQLite tables for trigger state.

SCHEMA_VERSION_CURRENT: int

3

_ensure_schema_version(conn: pynenc.util.sqlite_utils.SQLiteConnection) None[source]

Create the schema-version table and record the current version.

Used to detect future column or index migrations. A missing version means the schema predates this introduction; store_event and related operations remain backward compatible at version 1.

_init_monitoring_tables(conn: pynenc.util.sqlite_utils.SQLiteConnection) None[source]

Create event/trigger-run monitoring tables and indexes.

_init_event_tables(conn: pynenc.util.sqlite_utils.SQLiteConnection) None[source]

Create the events table plus its core indexes.

_init_event_invocation_index(conn: pynenc.util.sqlite_utils.SQLiteConnection) None[source]

Create the relation table linking events to triggered invocations.

_init_trigger_run_tables(conn: pynenc.util.sqlite_utils.SQLiteConnection) None[source]

Create trigger-run tables, indexes, and participant rows.

_register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]
get_condition(condition_id: str) pynenc.trigger.conditions.TriggerCondition | None[source]
register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]
_get_trigger(trigger_id: str) TriggerDefinitionDTO | None[source]
_get_triggers(trigger_ids: list[str]) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]

Fetch multiple triggers in batch using two queries.

Returns a list of TriggerDefinitionDTO for the provided trigger_ids preserving any that exist in the DB.

get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]
record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]
record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]
get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]
clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]
_get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]
get_last_cron_execution(condition_id: str) datetime.datetime | None[source]
store_last_cron_execution(condition_id: str, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]
_register_source_task_condition(task_id: pynenc.identifiers.task_id.TaskId, condition_id: str) None[source]
get_conditions_sourced_from_task(task_id: pynenc.identifiers.task_id.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]
claim_trigger_execution(trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60) bool[source]
claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]
clean_task_trigger_definitions(task_id: pynenc.identifiers.task_id.TaskId) None[source]
_purge() None[source]
store_event(event: pynenc.trigger.monitoring.EventRecord) None[source]

Upsert an event row plus its per-condition match links.

Legacy callers may populate event.triggered_invocation_ids directly; in that case the relation table is also seeded so that the indexed read APIs see the same data.

get_event(event_id: str) pynenc.trigger.monitoring.EventRecord | None[source]
get_events(*, event_code: str | None = None, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, matched: bool | None = None, triggered: bool | None = None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None, limit: int = 100, offset: int = 0) list[pynenc.trigger.monitoring.EventRecord][source]
count_events(*, event_code: str | None = None, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, matched: bool | None = None, triggered: bool | None = None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None) int[source]
get_events_emitted_by_invocation(invocation_id: str, *, limit: int = 200, offset: int = 0) list[pynenc.trigger.monitoring.EventRecord][source]

Override: direct indexed SELECT on emitted_by_invocation_id.

get_event_markers_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, *, event_code: str | None = None, state: str = 'all', limit: int = 1000, offset: int = 0) pynenc.trigger.monitoring.EventMarkerPage[source]
get_invocations_triggered_by_event(event_id: str) list[str][source]
_load_invocations_for_events(conn: pynenc.util.sqlite_utils.SQLiteConnection, event_ids: list[str]) dict[str, list[str]][source]

Bulk-load triggered invocation ids for a batch of event ids.

list_event_codes() list[str][source]
_build_event_where(event_code: str | None, start_time: datetime.datetime | None, end_time: datetime.datetime | None, matched: bool | None, triggered: bool | None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None) tuple[str, tuple][source]

Build a parameterized WHERE clause for event filters.

store_trigger_run(run: pynenc.trigger.monitoring.TriggerRunRecord) None[source]

Upsert a trigger run plus its per-condition link rows.

_insert_trigger_run_conditions(conn: pynenc.util.sqlite_utils.SQLiteConnection, run: pynenc.trigger.monitoring.TriggerRunRecord) None[source]

Write one row per participant (or per id when participants are absent).

get_trigger_run(trigger_run_id: str) pynenc.trigger.monitoring.TriggerRunRecord | None[source]
get_trigger_runs_for_event(event_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]
get_trigger_runs_for_invocation(invocation_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]
get_trigger_runs_sourced_by_invocation(invocation_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]
get_trigger_runs_for_valid_condition(valid_condition_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]
get_trigger_runs_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, *, event_code: str | None = None, task_id_key: str | None = None, limit: int | None = None) list[pynenc.trigger.monitoring.TriggerRunRecord][source]
_age_purge_events(threshold: datetime.datetime) list[str][source]
_age_purge_trigger_runs(threshold: datetime.datetime) int[source]
_cap_purge_events() list[str][source]
_cap_purge_trigger_runs() int[source]
_cascade_delete_runs_for_events(event_ids: list[str]) int[source]
_cascade_delete_runs_for_events_in_conn(conn: pynenc.util.sqlite_utils.SQLiteConnection, event_ids: list[str]) int[source]

Delete trigger runs that reference any of the given event ids.

_purge_events_age(conn: pynenc.util.sqlite_utils.SQLiteConnection, threshold: float) list[str][source]

Delete events older than threshold; return removed event ids.

_purge_trigger_runs_age(conn: pynenc.util.sqlite_utils.SQLiteConnection, threshold: float) int[source]

Delete trigger runs whose executed_at is older than threshold.

_enforce_event_capacity(conn: pynenc.util.sqlite_utils.SQLiteConnection) list[str][source]

Drop oldest events beyond event_max_records; return removed ids.

_enforce_trigger_run_capacity(conn: pynenc.util.sqlite_utils.SQLiteConnection) int[source]

Drop oldest trigger runs beyond trigger_run_max_records.

pynenc.trigger.sqlite_trigger._delete_by_ids(conn: pynenc.util.sqlite_utils.SQLiteConnection, table: str, column: str, ids: list[str]) None[source]

Delete rows from table where column is in ids in batches.