pynenc.trigger.mem_trigger

In-memory implementation of the Pynenc trigger subsystem.

This module provides an implementation of the trigger system that stores all its state in memory. It’s suitable for development and testing purposes.

Module Contents

Classes

MemTrigger

In-memory implementation of the Pynenc trigger system.

API

class pynenc.trigger.mem_trigger.MemTrigger(app: pynenc.app.Pynenc)[source]

Bases: pynenc.trigger.base_trigger.BaseTrigger

In-memory implementation of the Pynenc trigger system.

This implementation stores all data in memory and is suitable for single-process applications or testing.

Initialization

Initialize the memory-based trigger component.

Parameters:

app – The Pynenc application instance

_register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]

Register a condition in the in-memory system.

Parameters:

condition – The condition to register

get_condition(condition_id: str) pynenc.trigger.conditions.TriggerCondition | None[source]

Get a condition by its ID from the in-memory store.

Parameters:

condition_id – ID of the condition to retrieve

Returns:

The condition if found, None otherwise

register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]

Register a trigger definition in the in-memory system.

Parameters:

trigger – The trigger definition to register

_get_trigger(trigger_id: str) TriggerDefinitionDTO | None[source]

Get a trigger definition by ID from the in-memory store.

Parameters:

trigger_id – ID of the trigger to retrieve

Returns:

The trigger definition if found, None otherwise

get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]

Get all triggers that depend on a specific condition from the in-memory store.

Parameters:

condition_id – ID of the condition

Returns:

List of trigger definitions using this condition

record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]

Record that a condition has been satisfied with a specific context in memory.

Parameters:

valid_condition – The valid condition to record

record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]

Record that multiple conditions have been satisfied with their respective contexts in memory.

Parameters:

valid_conditions – The list of valid conditions to record

get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]

Get all currently valid conditions and their contexts from memory.

Returns:

Dictionary mapping condition IDs to their valid conditions

clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]

Clear valid conditions after they have been processed from memory.

Parameters:

conditions – List of valid conditions to clear

_get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]

Get all registered conditions from memory.

Returns:

List of all conditions

get_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId) datetime.datetime | None[source]

Get the timestamp of the last execution of a cron condition from memory.

Parameters:

condition_id – ID of the cron condition

Returns:

Timestamp of last execution, or None if never executed

store_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]

Store the timestamp of the last execution of a cron condition in memory.

This implementation uses thread locking to ensure atomicity and prevent race conditions in multi-threaded environments.

Parameters:
  • condition_id – ID of the cron condition

  • execution_time – Timestamp of the execution

  • expected_last_execution – Expected current value for optimistic locking

Returns:

True if stored successfully, False if another process already updated it

_register_source_task_condition(task_id: pynenc.identifiers.task_id.TaskId, condition_id: pynenc.trigger.types.ConditionId) None[source]

Register the association between a source task and the conditions it affects.

For in-memory implementation, the relationship is already stored in self._source_task_conditions during register_source_task_condition.

Parameters:
  • task_id – ID of the source task

  • condition_id – ID of the condition sourced from the task

get_conditions_sourced_from_task(task_id: pynenc.identifiers.task_id.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]

Get all conditions that are sourced from a specific task.

These are conditions that monitor the task and might be satisfied by its status or results.

Parameters:
  • task_id – ID of the source task

  • context_type – Optional context type to filter conditions by

Returns:

List of conditions monitoring this task

claim_trigger_execution(trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60) bool[source]

Atomically claim the right to execute a trigger for a specific valid condition.

Uses in-memory locking to prevent race conditions in multi-threaded environments.

Parameters:
  • trigger_id – ID of the trigger being executed

  • valid_condition_id – ID of the valid condition being processed

  • expiration_seconds – Number of seconds after which the claim expires

Returns:

True if the claim was successful, False if another worker has claimed it

claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]

Atomically claim the right to execute a trigger run.

Uses in-memory locking to prevent race conditions in multi-threaded environments.

Parameters:
  • trigger_run_id – Unique ID for this trigger run

  • expiration_seconds – Number of seconds after which the claim expires

Returns:

True if the claim was successful, False if another worker has claimed it

clean_task_trigger_definitions(task_id: pynenc.identifiers.task_id.TaskId) None[source]

Remove all trigger definitions for a specific task from memory.

_purge() None[source]

Purge all data from the in-memory trigger system.

This method clears all registered conditions, triggers, and valid conditions.