pynenc.trigger.sqlite_trigger¶
SQLite-based implementation of the Pynenc trigger subsystem.
This module provides a trigger system implementation that stores all its state in a SQLite database. Suitable for cross-process coordination and testing.
Module Contents¶
Classes¶
Table names for trigger, scoped by app_id. |
|
SQLite-based implementation of the Pynenc trigger system. |
API¶
- class pynenc.trigger.sqlite_trigger.Tables(app_id: str)[source]¶
Bases:
pynenc.util.sqlite_utils.TableNamesTable names for trigger, scoped by app_id.
Initialization
- class pynenc.trigger.sqlite_trigger.SQLiteTrigger(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.trigger.base_trigger.BaseTriggerSQLite-based implementation of the Pynenc trigger system.
Stores all trigger, condition, and claim data in a SQLite database for cross-process safety.
Initialization
Initialize the trigger component with the parent application.
- Parameters:
app – The Pynenc application instance
- register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]¶
- _get_trigger(trigger_id: str) TriggerDefinitionDTO | None[source]¶
- _get_triggers(trigger_ids: list[str]) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
Fetch multiple triggers in batch using two queries.
Returns a list of TriggerDefinitionDTO for the provided trigger_ids preserving any that exist in the DB.
- get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
- record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]¶
- clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]¶
- get_last_cron_execution(condition_id: str) datetime.datetime | None[source]¶
- store_last_cron_execution(condition_id: str, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]¶
- _register_source_task_condition(task_id: pynenc.identifiers.task_id.TaskId, condition_id: str) None[source]¶
- get_conditions_sourced_from_task(task_id: pynenc.identifiers.task_id.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]¶
- claim_trigger_execution(trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60) bool[source]¶
- clean_task_trigger_definitions(task_id: pynenc.identifiers.task_id.TaskId) None[source]¶