pynenc.trigger.sqlite_trigger

SQLite-based implementation of the Pynenc trigger subsystem.

This module provides a trigger system implementation that stores all its state in a SQLite database. Suitable for cross-process coordination and testing.

Module Contents

Classes

Tables

Table names for trigger, scoped by app_id.

SQLiteTrigger

SQLite-based implementation of the Pynenc trigger system.

API

class pynenc.trigger.sqlite_trigger.Tables(app_id: str)[source]

Bases: pynenc.util.sqlite_utils.TableNames

Table names for trigger, scoped by app_id.

Initialization

class pynenc.trigger.sqlite_trigger.SQLiteTrigger(app: pynenc.app.Pynenc)[source]

Bases: pynenc.trigger.base_trigger.BaseTrigger

SQLite-based implementation of the Pynenc trigger system.

Stores all trigger, condition, and claim data in a SQLite database for cross-process safety.

Initialization

Initialize the trigger component with the parent application.

Parameters:

app – The Pynenc application instance

conf() pynenc.conf.config_trigger.ConfigTriggerSQLite
_init_tables() None[source]

Initialize SQLite tables for trigger state.

_register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]
get_condition(condition_id: str) pynenc.trigger.conditions.TriggerCondition | None[source]
register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]
_get_trigger(trigger_id: str) TriggerDefinitionDTO | None[source]
_get_triggers(trigger_ids: list[str]) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]

Fetch multiple triggers in batch using two queries.

Returns a list of TriggerDefinitionDTO for the provided trigger_ids preserving any that exist in the DB.

get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]
record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]
record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]
get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]
clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]
_get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]
get_last_cron_execution(condition_id: str) datetime.datetime | None[source]
store_last_cron_execution(condition_id: str, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]
_register_source_task_condition(task_id: pynenc.identifiers.task_id.TaskId, condition_id: str) None[source]
get_conditions_sourced_from_task(task_id: pynenc.identifiers.task_id.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]
claim_trigger_execution(trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60) bool[source]
claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]
clean_task_trigger_definitions(task_id: pynenc.identifiers.task_id.TaskId) None[source]
_purge() None[source]