pynenc.trigger.base_trigger¶
Base classes and interfaces for Pynenc’s trigger system.
This module defines the core abstractions for time-based scheduling and event-driven task execution. The trigger component enables declarative scheduling of tasks through cron expressions, events, and task dependency chains.
Key components:
BaseTrigger: Abstract base class for trigger implementations
TriggerCondition: Base class for different trigger conditions
TriggerDefinition: Configuration linking conditions to tasks
Module Contents¶
Classes¶
Base class for the Trigger component in Pynenc. |
API¶
- class pynenc.trigger.base_trigger.BaseTrigger(app: pynenc.app.Pynenc)[source]¶
Bases:
abc.ABCBase class for the Trigger component in Pynenc.
The Trigger component manages time-based and event-driven task execution by evaluating conditions and triggering tasks when appropriate.
Initialization
Initialize the trigger component with the parent application.
- Parameters:
app – The Pynenc application instance
- conf() pynenc.conf.config_trigger.ConfigTrigger¶
Access to trigger configuration parameters.
- abstractmethod _register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]¶
Register a condition in the system.
This makes the condition available for trigger definitions.
- Parameters:
condition – The condition to register
- register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]¶
Register a condition in the system.
This method is called to register a new condition that can be used in trigger definitions.
- Parameters:
condition – The condition to register
- abstractmethod _register_source_task_condition(task_id: pynenc.task.TaskId, condition_id: pynenc.trigger.types.ConditionId) None[source]¶
Register the conditions that are sourced from a task.
This method is called when a new condition based on a source task is registered.
- Parameters:
task_id – ID of the source task
condition_id – ID of the condition sourced from the task
- register_source_task_condition(task_id: pynenc.task.TaskId, condition_id: pynenc.trigger.types.ConditionId) None[source]¶
Register a condition that is sourced from a specific task.
This builds the reverse mapping from tasks to the conditions they affect, used for efficient lookup when task status changes.
- Parameters:
task_id – ID of the source task
condition_id – ID of the condition sourced from the task
- abstractmethod get_condition(condition_id: str) TriggerCondition | None[source]¶
Get a condition by its ID.
- Parameters:
condition_id – ID of the condition to retrieve
- Returns:
The condition if found, None otherwise
- abstractmethod register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]¶
Register a trigger definition.
This creates a new trigger that will activate when its conditions are met.
- Parameters:
trigger – The trigger definition to register
- get_trigger(trigger_id: str) TriggerDefinition | None[source]¶
Get a trigger definition by ID.
- Parameters:
trigger_id – ID of the trigger to retrieve
- Returns:
The trigger definition if found, None otherwise
- abstractmethod _get_trigger(trigger_id: str) TriggerDefinitionDTO | None[source]¶
Get a trigger definition by ID.
- Parameters:
trigger_id – ID of the trigger to retrieve
- Returns:
The trigger definition if found, None otherwise
- abstractmethod get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
Get all triggers that depend on a specific condition.
- Parameters:
condition_id – ID of the condition
- Returns:
List of trigger definitions using this condition
- abstractmethod get_conditions_sourced_from_task(task_id: pynenc.task.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]¶
Get all conditions that are sourced from a specific task.
These are conditions that monitor the task and might be satisfied by its status or results.
- Parameters:
task_id – ID of the source task
context_type – Optional context type to filter conditions by
- Returns:
List of conditions monitoring this task
- validate_and_record_condition(condition: pynenc.trigger.conditions.TriggerCondition, context: pynenc.trigger.conditions.ConditionContext) bool[source]¶
Validate a condition against a context and record it if valid.
This method checks if the given condition is satisfied by the context, and if so, creates and records a valid condition.
- Parameters:
condition – The condition to validate
context – The context to validate against
- Returns:
True if the condition was satisfied and recorded, False otherwise
- validate_and_record_conditions(conditions_and_contexts: list[tuple[pynenc.trigger.conditions.TriggerCondition, pynenc.trigger.conditions.ConditionContext]]) list[bool][source]¶
Validate multiple conditions against a context and record them if valid.
This method checks each condition in the list against the context, and records those that are satisfied.
- Parameters:
conditions – List of conditions to validate
context – The context to validate against
- Returns:
List of booleans indicating if each condition was satisfied
- abstractmethod record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]¶
Record that a condition has been satisfied with a specific context.
This adds the condition to the list of valid conditions that can trigger tasks.
- Parameters:
valid_condition – The valid condition to record
- abstractmethod record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]¶
Record that multiple conditions have been satisfied with their respective contexts.
This adds the conditions to the list of valid conditions that can trigger tasks.
- Parameters:
valid_conditions – The list of valid conditions to record
- abstractmethod get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]¶
Get all currently valid conditions and their contexts.
- Returns:
Dictionary mapping condition IDs to their contexts
- abstractmethod clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]¶
Clear valid conditions after they have been processed.
- Parameters:
conditions – List of valid conditions to clear
- abstractmethod clean_task_trigger_definitions(task_id: pynenc.task.TaskId) None[source]¶
Remove all trigger definitions for a specific task.
This method should clean up any persisted trigger definitions associated with the task.
- Parameters:
task_id – ID of the task to clean triggers for
- register_task_triggers(task: pynenc.task.Task, triggers: TriggerBuilder | list[TriggerBuilder] | None = None) None[source]¶
Register triggers for a task.
This method processes trigger definitions created through a TriggerBuilder and registers them with the trigger system.
- Parameters:
task – The task to trigger
triggers – A TriggerBuilder or list of TriggerBuilders that define when the task should be triggered
- report_tasks_status(invocation_ids: list[pynenc.invocation.base_invocation.InvocationId], status: InvocationStatus | None = None) None[source]¶
Report status changes for multiple tasks to the trigger system in batch.
This method efficiently processes status changes for multiple invocations by batching context creation and condition validation.
- Parameters:
invocation_ids – List of invocation IDs reporting status changes
status – The new status for all invocations
- report_invocation_result(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, result: Any) None[source]¶
Report a task result to the trigger system.
This method checks if any conditions are watching for this task’s result, evaluates them, and records valid conditions.
- Parameters:
task_id – ID of the task reporting a result
result – The result produced by the task
invocation_id – ID of the specific invocation
- report_invocation_failure(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, exception: Exception) None[source]¶
Report a task exception to the trigger system.
This method checks if any conditions are watching for this task’s result, evaluates them, and records valid conditions.
- Parameters:
task_id – ID of the task reporting a result
exception – The exception raised by the task
invocation_id – ID of the specific invocation
- emit_event(event_code: str, payload: dict[str, Any]) str[source]¶
Emit an event into the system to potentially trigger tasks.
Creates an event context, evaluates event conditions, and records valid conditions for triggering tasks.
- Parameters:
event_code – Type of the event
payload – Data associated with the event
- Returns:
ID of the created event
- abstractmethod _get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]¶
Get all registered conditions.
- Returns:
List of all conditions
- abstractmethod get_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId) datetime.datetime | None[source]¶
Get the timestamp of the last execution of a cron condition from persistent storage.
- Parameters:
condition_id (str) – ID of the cron condition
- Returns:
Timestamp of last execution in UTC timezone, or None if never executed
- abstractmethod store_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]¶
Store the timestamp of the last execution of a cron condition in persistent storage.
This should be implemented as an atomic operation to prevent race conditions in distributed environments.
- Parameters:
condition_id (str) – ID of the cron condition
execution_time (datetime) – Timestamp of the execution (must be UTC timezone-aware)
expected_last_execution (datetime | None) – Expected current value for optimistic locking (UTC timezone-aware)
- Returns:
True if stored successfully, False if another process already updated it
- _should_trigger_cron_condition(condition: pynenc.trigger.conditions.CronCondition, current_time: datetime.datetime) pynenc.trigger.conditions.CronContext | None[source]¶
Determine if a cron condition should be triggered based on its schedule and last execution.
This method uses both local cache and persistent storage to efficiently determine if a cron condition should be triggered in a distributed environment. All datetime values are expected to be UTC timezone-aware.
- Parameters:
condition (CronCondition) – The cron condition to check
current_time (datetime) – Current time to check against (UTC timezone-aware)
- Returns:
CronContext if the condition is valid and should be triggered, None otherwise
- check_time_based_triggers(current_time: datetime.datetime | None = None) None[source]¶
Check and record valid time-based conditions.
This method evaluates cron conditions and ensures they are only triggered once per scheduled interval, even in distributed environments.
- Parameters:
current_time – Current time to check against, defaults to now
- abstractmethod claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]¶
Atomically claim the right to execute a trigger run.
This prevents multiple workers from executing the same trigger multiple times for the same context.
- Parameters:
trigger_run_id – Unique ID for this trigger run
expiration_seconds – Number of seconds after which the claim expires
- Returns:
True if the claim was successful, False if another worker has claimed it
- trigger_loop_iteration() None[source]¶
Process one iteration of the trigger loop.
This method is called periodically by the runner to:
Check time-based triggers
Process valid conditions
Execute triggered tasks
- execute_task(task_id: pynenc.task.TaskId, arguments: dict[str, Any] | None = None) pynenc.invocation.base_invocation.BaseInvocation[source]¶
Execute a task with the given arguments.
This method handles the actual execution of a task when its trigger conditions are met.
- Parameters:
task_id – ID of the task to execute
arguments – Arguments to pass to the task, if any