pynenc.trigger.base_trigger

Base classes and interfaces for Pynenc’s trigger system.

This module defines the core abstractions for time-based scheduling and event-driven task execution. The trigger component enables declarative scheduling of tasks through cron expressions, events, and task dependency chains.

Key components:

  • BaseTrigger: Abstract base class for trigger implementations

  • TriggerCondition: Base class for different trigger conditions

  • TriggerDefinition: Configuration linking conditions to tasks

Module Contents

Classes

BaseTrigger

Base class for the Trigger component in Pynenc.

API

class pynenc.trigger.base_trigger.BaseTrigger(app: pynenc.app.Pynenc)[source]

Bases: abc.ABC

Base class for the Trigger component in Pynenc.

The Trigger component manages time-based and event-driven task execution by evaluating conditions and triggering tasks when appropriate.

Initialization

Initialize the trigger component with the parent application.

Parameters:

app – The Pynenc application instance

conf() pynenc.conf.config_trigger.ConfigTrigger

Access to trigger configuration parameters.

abstractmethod _register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]

Register a condition in the system.

This makes the condition available for trigger definitions.

Parameters:

condition – The condition to register

register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]

Register a condition in the system.

This method is called to register a new condition that can be used in trigger definitions.

Parameters:

condition – The condition to register

abstractmethod _register_source_task_condition(task_id: pynenc.task.TaskId, condition_id: pynenc.trigger.types.ConditionId) None[source]

Register the conditions that are sourced from a task.

This method is called when a new condition based on a source task is registered.

Parameters:
  • task_id – ID of the source task

  • condition_id – ID of the condition sourced from the task

register_source_task_condition(task_id: pynenc.task.TaskId, condition_id: pynenc.trigger.types.ConditionId) None[source]

Register a condition that is sourced from a specific task.

This builds the reverse mapping from tasks to the conditions they affect, used for efficient lookup when task status changes.

Parameters:
  • task_id – ID of the source task

  • condition_id – ID of the condition sourced from the task

abstractmethod get_condition(condition_id: str) TriggerCondition | None[source]

Get a condition by its ID.

Parameters:

condition_id – ID of the condition to retrieve

Returns:

The condition if found, None otherwise

abstractmethod register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]

Register a trigger definition.

This creates a new trigger that will activate when its conditions are met.

Parameters:

trigger – The trigger definition to register

get_trigger(trigger_id: str) TriggerDefinition | None[source]

Get a trigger definition by ID.

Parameters:

trigger_id – ID of the trigger to retrieve

Returns:

The trigger definition if found, None otherwise

abstractmethod _get_trigger(trigger_id: str) TriggerDefinitionDTO | None[source]

Get a trigger definition by ID.

Parameters:

trigger_id – ID of the trigger to retrieve

Returns:

The trigger definition if found, None otherwise

abstractmethod get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]

Get all triggers that depend on a specific condition.

Parameters:

condition_id – ID of the condition

Returns:

List of trigger definitions using this condition

abstractmethod get_conditions_sourced_from_task(task_id: pynenc.task.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]

Get all conditions that are sourced from a specific task.

These are conditions that monitor the task and might be satisfied by its status or results.

Parameters:
  • task_id – ID of the source task

  • context_type – Optional context type to filter conditions by

Returns:

List of conditions monitoring this task

validate_and_record_condition(condition: pynenc.trigger.conditions.TriggerCondition, context: pynenc.trigger.conditions.ConditionContext) bool[source]

Validate a condition against a context and record it if valid.

This method checks if the given condition is satisfied by the context, and if so, creates and records a valid condition.

Parameters:
  • condition – The condition to validate

  • context – The context to validate against

Returns:

True if the condition was satisfied and recorded, False otherwise

validate_and_record_conditions(conditions_and_contexts: list[tuple[pynenc.trigger.conditions.TriggerCondition, pynenc.trigger.conditions.ConditionContext]]) list[bool][source]

Validate multiple conditions against a context and record them if valid.

This method checks each condition in the list against the context, and records those that are satisfied.

Parameters:
  • conditions – List of conditions to validate

  • context – The context to validate against

Returns:

List of booleans indicating if each condition was satisfied

abstractmethod record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]

Record that a condition has been satisfied with a specific context.

This adds the condition to the list of valid conditions that can trigger tasks.

Parameters:

valid_condition – The valid condition to record

abstractmethod record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]

Record that multiple conditions have been satisfied with their respective contexts.

This adds the conditions to the list of valid conditions that can trigger tasks.

Parameters:

valid_conditions – The list of valid conditions to record

abstractmethod get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]

Get all currently valid conditions and their contexts.

Returns:

Dictionary mapping condition IDs to their contexts

abstractmethod clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]

Clear valid conditions after they have been processed.

Parameters:

conditions – List of valid conditions to clear

abstractmethod clean_task_trigger_definitions(task_id: pynenc.task.TaskId) None[source]

Remove all trigger definitions for a specific task.

This method should clean up any persisted trigger definitions associated with the task.

Parameters:

task_id – ID of the task to clean triggers for

register_task_triggers(task: pynenc.task.Task, triggers: TriggerBuilder | list[TriggerBuilder] | None = None) None[source]

Register triggers for a task.

This method processes trigger definitions created through a TriggerBuilder and registers them with the trigger system.

Parameters:
  • task – The task to trigger

  • triggers – A TriggerBuilder or list of TriggerBuilders that define when the task should be triggered

report_tasks_status(invocation_ids: list[pynenc.invocation.base_invocation.InvocationId], status: InvocationStatus | None = None) None[source]

Report status changes for multiple tasks to the trigger system in batch.

This method efficiently processes status changes for multiple invocations by batching context creation and condition validation.

Parameters:
  • invocation_ids – List of invocation IDs reporting status changes

  • status – The new status for all invocations

report_invocation_result(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, result: Any) None[source]

Report a task result to the trigger system.

This method checks if any conditions are watching for this task’s result, evaluates them, and records valid conditions.

Parameters:
  • task_id – ID of the task reporting a result

  • result – The result produced by the task

  • invocation_id – ID of the specific invocation

report_invocation_failure(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, exception: Exception) None[source]

Report a task exception to the trigger system.

This method checks if any conditions are watching for this task’s result, evaluates them, and records valid conditions.

Parameters:
  • task_id – ID of the task reporting a result

  • exception – The exception raised by the task

  • invocation_id – ID of the specific invocation

emit_event(event_code: str, payload: dict[str, Any]) str[source]

Emit an event into the system to potentially trigger tasks.

Creates an event context, evaluates event conditions, and records valid conditions for triggering tasks.

Parameters:
  • event_code – Type of the event

  • payload – Data associated with the event

Returns:

ID of the created event

abstractmethod _get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]

Get all registered conditions.

Returns:

List of all conditions

abstractmethod get_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId) datetime.datetime | None[source]

Get the timestamp of the last execution of a cron condition from persistent storage.

Parameters:

condition_id (str) – ID of the cron condition

Returns:

Timestamp of last execution in UTC timezone, or None if never executed

abstractmethod store_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]

Store the timestamp of the last execution of a cron condition in persistent storage.

This should be implemented as an atomic operation to prevent race conditions in distributed environments.

Parameters:
  • condition_id (str) – ID of the cron condition

  • execution_time (datetime) – Timestamp of the execution (must be UTC timezone-aware)

  • expected_last_execution (datetime | None) – Expected current value for optimistic locking (UTC timezone-aware)

Returns:

True if stored successfully, False if another process already updated it

_should_trigger_cron_condition(condition: pynenc.trigger.conditions.CronCondition, current_time: datetime.datetime) pynenc.trigger.conditions.CronContext | None[source]

Determine if a cron condition should be triggered based on its schedule and last execution.

This method uses both local cache and persistent storage to efficiently determine if a cron condition should be triggered in a distributed environment. All datetime values are expected to be UTC timezone-aware.

Parameters:
  • condition (CronCondition) – The cron condition to check

  • current_time (datetime) – Current time to check against (UTC timezone-aware)

Returns:

CronContext if the condition is valid and should be triggered, None otherwise

check_time_based_triggers(current_time: datetime.datetime | None = None) None[source]

Check and record valid time-based conditions.

This method evaluates cron conditions and ensures they are only triggered once per scheduled interval, even in distributed environments.

Parameters:

current_time – Current time to check against, defaults to now

abstractmethod claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]

Atomically claim the right to execute a trigger run.

This prevents multiple workers from executing the same trigger multiple times for the same context.

Parameters:
  • trigger_run_id – Unique ID for this trigger run

  • expiration_seconds – Number of seconds after which the claim expires

Returns:

True if the claim was successful, False if another worker has claimed it

trigger_loop_iteration() None[source]

Process one iteration of the trigger loop.

This method is called periodically by the runner to:

  1. Check time-based triggers

  2. Process valid conditions

  3. Execute triggered tasks

execute_task(task_id: pynenc.task.TaskId, arguments: dict[str, Any] | None = None) pynenc.invocation.base_invocation.BaseInvocation[source]

Execute a task with the given arguments.

This method handles the actual execution of a task when its trigger conditions are met.

Parameters:
  • task_id – ID of the task to execute

  • arguments – Arguments to pass to the task, if any

purge() None[source]

Purges all the trigger data for the current self.app.app_id.

abstractmethod _purge() None[source]

Purges all the trigger data for the current self.app.app_id.

reload_task_conditions() None[source]

Reload all task conditions from the current app’s registered tasks.

This method clears the local condition and source-task mappings, then iterates over all tasks in the app and re-registers their triggers and conditions.

Returns:

None