pynenc.trigger.mem_trigger¶
In-memory implementation of the Pynenc trigger subsystem.
This module provides an implementation of the trigger system that stores all its state in memory. It’s suitable for development and testing purposes.
Module Contents¶
Classes¶
In-memory implementation of the Pynenc trigger system. |
API¶
- class pynenc.trigger.mem_trigger.MemTrigger(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.trigger.base_trigger.BaseTriggerIn-memory implementation of the Pynenc trigger system.
This implementation stores all data in memory and is suitable for single-process applications or testing.
Initialization
Initialize the memory-based trigger component.
- Parameters:
app – The Pynenc application instance
- _register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]¶
Register a condition in the in-memory system.
- Parameters:
condition – The condition to register
- get_condition(condition_id: str) pynenc.trigger.conditions.TriggerCondition | None[source]¶
Get a condition by its ID from the in-memory store.
- Parameters:
condition_id – ID of the condition to retrieve
- Returns:
The condition if found, None otherwise
- register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]¶
Register a trigger definition in the in-memory system.
- Parameters:
trigger – The trigger definition to register
- _get_trigger(trigger_id: str) TriggerDefinitionDTO | None[source]¶
Get a trigger definition by ID from the in-memory store.
- Parameters:
trigger_id – ID of the trigger to retrieve
- Returns:
The trigger definition if found, None otherwise
- get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
Get all triggers that depend on a specific condition from the in-memory store.
- Parameters:
condition_id – ID of the condition
- Returns:
List of trigger definitions using this condition
- record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]¶
Record that a condition has been satisfied with a specific context in memory.
- Parameters:
valid_condition – The valid condition to record
- record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]¶
Record that multiple conditions have been satisfied with their respective contexts in memory.
- Parameters:
valid_conditions – The list of valid conditions to record
- get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]¶
Get all currently valid conditions and their contexts from memory.
- Returns:
Dictionary mapping condition IDs to their valid conditions
- clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]¶
Clear valid conditions after they have been processed from memory.
- Parameters:
conditions – List of valid conditions to clear
- _get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]¶
Get all registered conditions from memory.
- Returns:
List of all conditions
- get_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId) datetime.datetime | None[source]¶
Get the timestamp of the last execution of a cron condition from memory.
- Parameters:
condition_id – ID of the cron condition
- Returns:
Timestamp of last execution, or None if never executed
- store_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]¶
Store the timestamp of the last execution of a cron condition in memory.
This implementation uses thread locking to ensure atomicity and prevent race conditions in multi-threaded environments.
- Parameters:
condition_id – ID of the cron condition
execution_time – Timestamp of the execution
expected_last_execution – Expected current value for optimistic locking
- Returns:
True if stored successfully, False if another process already updated it
- _register_source_task_condition(task_id: pynenc.identifiers.task_id.TaskId, condition_id: pynenc.trigger.types.ConditionId) None[source]¶
Register the association between a source task and the conditions it affects.
For in-memory implementation, the relationship is already stored in self._source_task_conditions during register_source_task_condition.
- Parameters:
task_id – ID of the source task
condition_id – ID of the condition sourced from the task
- get_conditions_sourced_from_task(task_id: pynenc.identifiers.task_id.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]¶
Get all conditions that are sourced from a specific task.
These are conditions that monitor the task and might be satisfied by its status or results.
- Parameters:
task_id – ID of the source task
context_type – Optional context type to filter conditions by
- Returns:
List of conditions monitoring this task
- claim_trigger_execution(trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60) bool[source]¶
Atomically claim the right to execute a trigger for a specific valid condition.
Uses in-memory locking to prevent race conditions in multi-threaded environments.
- Parameters:
trigger_id – ID of the trigger being executed
valid_condition_id – ID of the valid condition being processed
expiration_seconds – Number of seconds after which the claim expires
- Returns:
True if the claim was successful, False if another worker has claimed it
- claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]¶
Atomically claim the right to execute a trigger run.
Uses in-memory locking to prevent race conditions in multi-threaded environments.
- Parameters:
trigger_run_id – Unique ID for this trigger run
expiration_seconds – Number of seconds after which the claim expires
- Returns:
True if the claim was successful, False if another worker has claimed it
- clean_task_trigger_definitions(task_id: pynenc.identifiers.task_id.TaskId) None[source]¶
Remove all trigger definitions for a specific task from memory.