pynenc.trigger.base_trigger

Base classes and interfaces for Pynenc’s trigger system.

This module defines the core abstractions for time-based scheduling and event-driven task execution. The trigger component enables declarative scheduling of tasks through cron expressions, events, and task dependency chains.

Key components:

  • BaseTrigger: Abstract base class for trigger implementations

  • TriggerCondition: Base class for different trigger conditions

  • TriggerDefinition: Configuration linking conditions to tasks

Module Contents

Classes

BaseTrigger

Base class for the Trigger component in Pynenc.

API

class pynenc.trigger.base_trigger.BaseTrigger(app: pynenc.app.Pynenc)[source]

Bases: abc.ABC

Base class for the Trigger component in Pynenc.

The Trigger component manages time-based and event-driven task execution by evaluating conditions and triggering tasks when appropriate.

Initialization

Initialize the trigger component with the parent application.

Parameters:

app – The Pynenc application instance

conf() pynenc.conf.config_trigger.ConfigTrigger

Access to trigger configuration parameters.

abstractmethod _register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]

Register a condition in the system.

This makes the condition available for trigger definitions.

Parameters:

condition – The condition to register

register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]

Register a condition in the system.

This method is called to register a new condition that can be used in trigger definitions.

Parameters:

condition – The condition to register

abstractmethod _register_source_task_condition(task_id: pynenc.task.TaskId, condition_id: pynenc.trigger.types.ConditionId) None[source]

Register the conditions that are sourced from a task.

This method is called when a new condition based on a source task is registered.

Parameters:
  • task_id – ID of the source task

  • condition_id – ID of the condition sourced from the task

register_source_task_condition(task_id: pynenc.task.TaskId, condition_id: pynenc.trigger.types.ConditionId) None[source]

Register a condition that is sourced from a specific task.

This builds the reverse mapping from tasks to the conditions they affect, used for efficient lookup when task status changes.

Parameters:
  • task_id – ID of the source task

  • condition_id – ID of the condition sourced from the task

abstractmethod get_condition(condition_id: str) TriggerCondition | None[source]

Get a condition by its ID.

Parameters:

condition_id – ID of the condition to retrieve

Returns:

The condition if found, None otherwise

abstractmethod register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]

Register a trigger definition.

This creates a new trigger that will activate when its conditions are met.

Parameters:

trigger – The trigger definition to register

get_trigger(trigger_id: str) TriggerDefinition | None[source]

Get a trigger definition by ID.

Parameters:

trigger_id – ID of the trigger to retrieve

Returns:

The trigger definition if found, None otherwise

abstractmethod _get_trigger(trigger_id: str) TriggerDefinitionDTO | None[source]

Get a trigger definition by ID.

Parameters:

trigger_id – ID of the trigger to retrieve

Returns:

The trigger definition if found, None otherwise

abstractmethod get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]

Get all triggers that depend on a specific condition.

Parameters:

condition_id – ID of the condition

Returns:

List of trigger definitions using this condition

abstractmethod get_conditions_sourced_from_task(task_id: pynenc.task.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]

Get all conditions that are sourced from a specific task.

These are conditions that monitor the task and might be satisfied by its status or results.

Parameters:
  • task_id – ID of the source task

  • context_type – Optional context type to filter conditions by

Returns:

List of conditions monitoring this task

validate_and_record_condition(condition: pynenc.trigger.conditions.TriggerCondition, context: pynenc.trigger.conditions.ConditionContext) bool[source]

Validate a condition against a context and record it if valid.

This method checks if the given condition is satisfied by the context, and if so, creates and records a valid condition.

Parameters:
  • condition – The condition to validate

  • context – The context to validate against

Returns:

True if the condition was satisfied and recorded, False otherwise

validate_and_record_conditions(conditions_and_contexts: list[tuple[pynenc.trigger.conditions.TriggerCondition, pynenc.trigger.conditions.ConditionContext]]) list[bool][source]

Validate multiple conditions against a context and record them if valid.

Returns a list of booleans matching conditions_and_contexts. Use

Meth:

evaluate_and_record_conditions when the caller needs the ValidCondition objects (for example to populate event monitoring).

Parameters:

conditions_and_contexts – Pairs of condition and context.

Returns:

List of booleans indicating if each condition was satisfied.

evaluate_and_record_conditions(conditions_and_contexts: list[tuple[pynenc.trigger.conditions.TriggerCondition, pynenc.trigger.conditions.ConditionContext]]) list[ValidCondition | None][source]

Evaluate conditions and record the satisfied ones.

Returns a list of the same length as conditions_and_contexts; each entry is either the recorded :class:ValidCondition or None when the condition was not satisfied.

Parameters:

conditions_and_contexts – Pairs of condition and context.

Returns:

ValidCondition per pair, None for unmatched ones.

_log_condition_matched(condition: pynenc.trigger.conditions.TriggerCondition, context: pynenc.trigger.conditions.ConditionContext, valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]

Emit the predefined trigger.condition.matched log.

Uses log_messages.context_source_ref so the source ref is consistent with the contract documented in docs/sprints/0004_1_triggers_workflows/11_pynmon_events_refinements.md.

abstractmethod record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]

Record that a condition has been satisfied with a specific context.

This adds the condition to the list of valid conditions that can trigger tasks.

Parameters:

valid_condition – The valid condition to record

abstractmethod record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]

Record that multiple conditions have been satisfied with their respective contexts.

This adds the conditions to the list of valid conditions that can trigger tasks.

Parameters:

valid_conditions – The list of valid conditions to record

abstractmethod get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]

Get all currently valid conditions and their contexts.

Returns:

Dictionary mapping condition IDs to their contexts

abstractmethod clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]

Clear valid conditions after they have been processed.

Parameters:

conditions – List of valid conditions to clear

abstractmethod clean_task_trigger_definitions(task_id: pynenc.task.TaskId) None[source]

Remove all trigger definitions for a specific task.

This method should clean up any persisted trigger definitions associated with the task.

Parameters:

task_id – ID of the task to clean triggers for

register_task_triggers(task: pynenc.task.Task, triggers: TriggerBuilder | list[TriggerBuilder] | None = None) None[source]

Register triggers for a task.

This method processes trigger definitions created through a TriggerBuilder and registers them with the trigger system.

Parameters:
  • task – The task to trigger

  • triggers – A TriggerBuilder or list of TriggerBuilders that define when the task should be triggered

report_tasks_status(invocation_ids: list[pynenc.invocation.base_invocation.InvocationId], status: InvocationStatus | None = None) None[source]

Report status changes for multiple tasks to the trigger system in batch.

This method efficiently processes status changes for multiple invocations by batching context creation and condition validation.

Parameters:
  • invocation_ids – List of invocation IDs reporting status changes

  • status – The new status for all invocations

report_invocation_result(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, result: Any) None[source]

Report a task result to the trigger system.

This method checks if any conditions are watching for this task’s result, evaluates them, and records valid conditions.

Parameters:
  • task_id – ID of the task reporting a result

  • result – The result produced by the task

  • invocation_id – ID of the specific invocation

report_invocation_failure(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, exception: Exception) None[source]

Report a task exception to the trigger system.

This method checks if any conditions are watching for this task’s result, evaluates them, and records valid conditions.

Parameters:
  • task_id – ID of the task reporting a result

  • exception – The exception raised by the task

  • invocation_id – ID of the specific invocation

emit_event(event_code: str, payload: dict[str, Any]) str[source]

Emit an event into the system to potentially trigger tasks.

Creates an event context, evaluates event conditions, and records valid conditions for triggering tasks. The event is also persisted as an :class:EventRecord for monitoring even when no condition matches.

Parameters:
  • event_code – Type of the event

  • payload – Data associated with the event

Returns:

ID of the created event

_build_event_record(event_id: str, event_code: str, payload: dict[str, Any], timestamp: datetime.datetime, runner_context_id: str) pynenc.trigger.monitoring.EventRecord[source]

Build a fresh :class:EventRecord for the current emitter context.

_safe_store_event(record: pynenc.trigger.monitoring.EventRecord, runner_context: pynenc.runner.runner_context.RunnerContext, log_template: str) None[source]

Persist record swallowing storage failures per the monitoring policy.

Monitoring writes must never break the hot path. This helper centralizes the “log and continue” policy documented for the trigger component.

_event_conditions_for(event_code: str) list[pynenc.trigger.conditions.EventCondition][source]

Return registered :class:EventCondition objects for event_code.

_current_emitter_invocation_id() str | None[source]

Return the invocation ID currently emitting an event, if any.

Reads pynenc.context.get_dist_invocation_context (thread-local) which is populated by DistributedInvocation.run for the duration of a task body. Returns None outside any task.

_current_emitter_context() tuple[str | None, str | None][source]

Return (invocation_id, task_id_key) for the emitting task, if any.

abstractmethod _get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]

Get all registered conditions.

Returns:

List of all conditions

abstractmethod get_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId) datetime.datetime | None[source]

Get the timestamp of the last execution of a cron condition from persistent storage.

Parameters:

condition_id (str) – ID of the cron condition

Returns:

Timestamp of last execution in UTC timezone, or None if never executed

abstractmethod store_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]

Store the timestamp of the last execution of a cron condition in persistent storage.

This should be implemented as an atomic operation to prevent race conditions in distributed environments.

Parameters:
  • condition_id (str) – ID of the cron condition

  • execution_time (datetime) – Timestamp of the execution (must be UTC timezone-aware)

  • expected_last_execution (datetime | None) – Expected current value for optimistic locking (UTC timezone-aware)

Returns:

True if stored successfully, False if another process already updated it

_should_trigger_cron_condition(condition: pynenc.trigger.conditions.CronCondition, current_time: datetime.datetime, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) pynenc.trigger.conditions.CronContext | None[source]

Determine if a cron condition should be triggered based on its schedule and last execution.

This method uses both local cache and persistent storage to efficiently determine if a cron condition should be triggered in a distributed environment. All datetime values are expected to be UTC timezone-aware.

Parameters:
  • condition (CronCondition) – The cron condition to check

  • current_time (datetime) – Current time to check against (UTC timezone-aware)

  • atomic_service_run (AtomicServiceRun) – Atomic-service run evaluating cron.

Returns:

CronContext if the condition is valid and should be triggered, None otherwise

check_time_based_triggers(current_time: datetime.datetime | None = None, *, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]

Check and record valid time-based conditions.

abstractmethod claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]

Atomically claim the right to execute a trigger run.

This prevents multiple workers from executing the same trigger multiple times for the same context.

Parameters:
  • trigger_run_id – Unique ID for this trigger run

  • expiration_seconds – Number of seconds after which the claim expires

Returns:

True if the claim was successful, False if another worker has claimed it

trigger_loop_iteration(atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]

Process one iteration of the trigger loop.

_process_loop_iteration(atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]

Body of trigger_loop_iteration extracted for purge wrapping.

_process_loop_iteration_body(atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]

Inner body of one trigger-loop iteration.

_execution_contexts_for_trigger(trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, context: pynenc.trigger.trigger_context.TriggerContext) list[pynenc.trigger.trigger_context.TriggerContext][source]

Build the concrete contexts that should execute for trigger.

A trigger can receive several valid contexts for the same condition in one loop pass (for example four invocations of the same task reaching SUCCESS before the trigger loop wakes up). Single-condition triggers must execute once per valid context, not once for the whole batch.

OR triggers also execute independently per satisfied condition so their argument provider and trigger-run record see the exact source context. Multi-condition AND triggers keep the combined context because they need one valid condition for each required condition.

execute_task(task_id: pynenc.task.TaskId, arguments: dict[str, Any] | None = None, *, parent_event_id: str | None = None) pynenc.invocation.base_invocation.BaseInvocation[source]

Execute a task with the given arguments.

This method handles the actual execution of a task when its trigger conditions are met.

Parameters:
  • task_id – ID of the task to execute

  • arguments – Arguments to pass to the task, if any

  • parent_event_id – Optional id of the event that fired this trigger. Set on the routed invocation so the family tree can show event-children.

_log_run_claimed(run_id: str, trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, execution_ctx: pynenc.trigger.trigger_context.TriggerContext, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]

Emit the predefined trigger.run.claimed log.

Lists the participating condition IDs so the Log Explorer can cross-link back to the matched conditions without an extra query.

_log_run_executed(run_id: str, trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, execution_ctx: pynenc.trigger.trigger_context.TriggerContext, invocation: pynenc.invocation.base_invocation.BaseInvocation, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]

Emit the predefined trigger.run.executed log.

Includes the generated invocation plus any event/source-invocation participants so a single log line links every causal endpoint of the run for downstream Pynmon hydration.

_record_trigger_run(*, run_id: str, trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, context: pynenc.trigger.trigger_context.TriggerContext, invocation: pynenc.invocation.base_invocation.BaseInvocation, claimed_at: datetime.datetime, executed_at: datetime.datetime, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]

Persist a :class:TriggerRunRecord and link affected events.

Failures are logged and never propagated; monitoring must not break the trigger loop. The event/invocation relation is recorded via the backend’s atomic link_trigger_run_to_events instead of a read-modify-write on the event JSON.

_build_trigger_run_record(run_id: str, trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, context: pynenc.trigger.trigger_context.TriggerContext, invocation: pynenc.invocation.base_invocation.BaseInvocation, claimed_at: datetime.datetime, executed_at: datetime.datetime, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) pynenc.trigger.monitoring.TriggerRunRecord[source]

Assemble a :class:TriggerRunRecord from a satisfied trigger context.

_monitoring_context_for_trigger_run(trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, context: pynenc.trigger.trigger_context.TriggerContext) pynenc.trigger.trigger_context.TriggerContext[source]

Keep only the condition instances that explain this trigger run.

_build_run_participants(context: pynenc.trigger.trigger_context.TriggerContext) list[pynenc.trigger.monitoring.TriggerRunParticipant][source]

Build a per-condition participant list from context.

Stores only reference ids plus a lightweight timestamp/summary so pynmon timeline overlays and tooltips can anchor and label the participant. Detail panels re-fetch the live condition / event / invocation when they need the full payload.

purge() None[source]

Purges all the trigger data for the current self.app.app_id.

abstractmethod _purge() None[source]

Purges all the trigger data for the current self.app.app_id.

abstractmethod store_event(event: pynenc.trigger.monitoring.EventRecord) None[source]

Store or replace an EventRecord.

abstractmethod get_event(event_id: str) EventRecord | None[source]

Return the EventRecord for event_id, or None.

abstractmethod get_events(*, event_code: str | None = None, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, matched: bool | None = None, triggered: bool | None = None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None, limit: int = 100, offset: int = 0) list[pynenc.trigger.monitoring.EventRecord][source]

Return events ordered by timestamp descending.

abstractmethod count_events(*, event_code: str | None = None, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, matched: bool | None = None, triggered: bool | None = None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None) int[source]

Count events with the same filters as get_events.

abstractmethod get_event_markers_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, *, event_code: str | None = None, state: str = 'all', limit: int = 1000, offset: int = 0) pynenc.trigger.monitoring.EventMarkerPage[source]

Return a paginated :class:EventMarkerPage for the time window.

Use a narrow projection: id, code, timestamp, matched, triggered.

Parameters:

state"all", "matched", "unmatched", "triggered", or "untriggered".

Link invocation_id to event_ids.

Backends should update event links atomically.

abstractmethod get_invocations_triggered_by_event(event_id: str) list[str][source]

Return invocation IDs triggered by event_id.

abstractmethod list_event_codes() list[str][source]

Return the distinct event codes.

abstractmethod store_trigger_run(run: pynenc.trigger.monitoring.TriggerRunRecord) None[source]

Store or replace a TriggerRunRecord.

abstractmethod get_trigger_run(trigger_run_id: str) TriggerRunRecord | None[source]

Return the TriggerRunRecord for trigger_run_id, or None.

abstractmethod get_trigger_runs_for_event(event_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]

Return trigger runs that reference event_id.

abstractmethod get_trigger_runs_for_invocation(invocation_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]

Return trigger runs that produced invocation_id.

get_trigger_runs_for_valid_condition(valid_condition_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]

Return trigger runs that include valid_condition_id.

Historical participant snapshots keep the match visible.

abstractmethod get_trigger_runs_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, *, event_code: str | None = None, task_id_key: str | None = None, limit: int | None = None) list[pynenc.trigger.monitoring.TriggerRunRecord][source]

Return trigger runs between start_time and end_time.

get_trigger_runs_sourced_by_invocation(invocation_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]

Return runs where invocation_id was a source participant.

Backends with a source-invocation index should override this scan.

get_events_batch(event_ids: collections.abc.Sequence[str]) dict[str, EventRecord | None][source]

Return EventRecord values for each id, or None.

get_trigger_runs_batch(run_ids: collections.abc.Sequence[str]) dict[str, TriggerRunRecord | None][source]

Return TriggerRunRecord values for each id, or None.

get_referenced_atomic_service_run_ids() set[str][source]

Return atomic_service_run_id values referenced by trigger runs.

Used to avoid purging linked execution records. Override with a dedicated index when available.

get_conditions_batch(condition_ids: collections.abc.Sequence[str]) dict[str, TriggerCondition | None][source]

Return TriggerCondition values for each id, or None.

get_triggers_batch(trigger_ids: collections.abc.Sequence[str]) dict[str, TriggerDefinition | None][source]

Return TriggerDefinition values for each id, or None.

get_events_emitted_by_invocation(invocation_id: str, *, limit: int = 200, offset: int = 0) list[pynenc.trigger.monitoring.EventRecord][source]

Return events emitted by invocation_id.

auto_purge_events() int[source]

Purge expired event and trigger-run records.

Returns 0 when auto purge is disabled.

_auto_purge_events() int[source]

Apply age and capacity purge to events and trigger runs.

abstractmethod _age_purge_events(threshold: datetime.datetime) list[str][source]

Delete events older than threshold and return their ids.

abstractmethod _age_purge_trigger_runs(threshold: datetime.datetime) int[source]

Delete trigger runs older than threshold and return the count.

abstractmethod _cap_purge_events() list[str][source]

Delete excess events and return their ids.

abstractmethod _cascade_delete_runs_for_events(event_ids: list[str]) int[source]

Delete trigger runs that reference any of event_ids.

abstractmethod _cap_purge_trigger_runs() int[source]

Delete excess trigger runs and return the count.

reload_task_conditions() None[source]

Reload all task conditions from the current app’s registered tasks.

This method clears the local condition and source-task mappings, then iterates over all tasks in the app and re-registers their triggers and conditions.

Returns:

None