pynenc.trigger.base_trigger¶
Base classes and interfaces for Pynenc’s trigger system.
This module defines the core abstractions for time-based scheduling and event-driven task execution. The trigger component enables declarative scheduling of tasks through cron expressions, events, and task dependency chains.
Key components:
BaseTrigger: Abstract base class for trigger implementations
TriggerCondition: Base class for different trigger conditions
TriggerDefinition: Configuration linking conditions to tasks
Module Contents¶
Classes¶
Base class for the Trigger component in Pynenc. |
API¶
- class pynenc.trigger.base_trigger.BaseTrigger(app: pynenc.app.Pynenc)[source]¶
Bases:
abc.ABCBase class for the Trigger component in Pynenc.
The Trigger component manages time-based and event-driven task execution by evaluating conditions and triggering tasks when appropriate.
Initialization
Initialize the trigger component with the parent application.
- Parameters:
app – The Pynenc application instance
- conf() pynenc.conf.config_trigger.ConfigTrigger¶
Access to trigger configuration parameters.
- abstractmethod _register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]¶
Register a condition in the system.
This makes the condition available for trigger definitions.
- Parameters:
condition – The condition to register
- register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]¶
Register a condition in the system.
This method is called to register a new condition that can be used in trigger definitions.
- Parameters:
condition – The condition to register
- abstractmethod _register_source_task_condition(task_id: pynenc.task.TaskId, condition_id: pynenc.trigger.types.ConditionId) None[source]¶
Register the conditions that are sourced from a task.
This method is called when a new condition based on a source task is registered.
- Parameters:
task_id – ID of the source task
condition_id – ID of the condition sourced from the task
- register_source_task_condition(task_id: pynenc.task.TaskId, condition_id: pynenc.trigger.types.ConditionId) None[source]¶
Register a condition that is sourced from a specific task.
This builds the reverse mapping from tasks to the conditions they affect, used for efficient lookup when task status changes.
- Parameters:
task_id – ID of the source task
condition_id – ID of the condition sourced from the task
- abstractmethod get_condition(condition_id: str) TriggerCondition | None[source]¶
Get a condition by its ID.
- Parameters:
condition_id – ID of the condition to retrieve
- Returns:
The condition if found, None otherwise
- abstractmethod register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]¶
Register a trigger definition.
This creates a new trigger that will activate when its conditions are met.
- Parameters:
trigger – The trigger definition to register
- get_trigger(trigger_id: str) TriggerDefinition | None[source]¶
Get a trigger definition by ID.
- Parameters:
trigger_id – ID of the trigger to retrieve
- Returns:
The trigger definition if found, None otherwise
- abstractmethod _get_trigger(trigger_id: str) TriggerDefinitionDTO | None[source]¶
Get a trigger definition by ID.
- Parameters:
trigger_id – ID of the trigger to retrieve
- Returns:
The trigger definition if found, None otherwise
- abstractmethod get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
Get all triggers that depend on a specific condition.
- Parameters:
condition_id – ID of the condition
- Returns:
List of trigger definitions using this condition
- abstractmethod get_conditions_sourced_from_task(task_id: pynenc.task.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]¶
Get all conditions that are sourced from a specific task.
These are conditions that monitor the task and might be satisfied by its status or results.
- Parameters:
task_id – ID of the source task
context_type – Optional context type to filter conditions by
- Returns:
List of conditions monitoring this task
- validate_and_record_condition(condition: pynenc.trigger.conditions.TriggerCondition, context: pynenc.trigger.conditions.ConditionContext) bool[source]¶
Validate a condition against a context and record it if valid.
This method checks if the given condition is satisfied by the context, and if so, creates and records a valid condition.
- Parameters:
condition – The condition to validate
context – The context to validate against
- Returns:
True if the condition was satisfied and recorded, False otherwise
- validate_and_record_conditions(conditions_and_contexts: list[tuple[pynenc.trigger.conditions.TriggerCondition, pynenc.trigger.conditions.ConditionContext]]) list[bool][source]¶
Validate multiple conditions against a context and record them if valid.
Returns a list of booleans matching
conditions_and_contexts. Use- Meth:
evaluate_and_record_conditionswhen the caller needs theValidConditionobjects (for example to populate event monitoring).- Parameters:
conditions_and_contexts – Pairs of condition and context.
- Returns:
List of booleans indicating if each condition was satisfied.
- evaluate_and_record_conditions(conditions_and_contexts: list[tuple[pynenc.trigger.conditions.TriggerCondition, pynenc.trigger.conditions.ConditionContext]]) list[ValidCondition | None][source]¶
Evaluate conditions and record the satisfied ones.
Returns a list of the same length as
conditions_and_contexts; each entry is either the recorded :class:ValidConditionorNonewhen the condition was not satisfied.- Parameters:
conditions_and_contexts – Pairs of condition and context.
- Returns:
ValidConditionper pair,Nonefor unmatched ones.
- _log_condition_matched(condition: pynenc.trigger.conditions.TriggerCondition, context: pynenc.trigger.conditions.ConditionContext, valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]¶
Emit the predefined
trigger.condition.matchedlog.Uses
log_messages.context_source_refso the source ref is consistent with the contract documented indocs/sprints/0004_1_triggers_workflows/11_pynmon_events_refinements.md.
- abstractmethod record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]¶
Record that a condition has been satisfied with a specific context.
This adds the condition to the list of valid conditions that can trigger tasks.
- Parameters:
valid_condition – The valid condition to record
- abstractmethod record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]¶
Record that multiple conditions have been satisfied with their respective contexts.
This adds the conditions to the list of valid conditions that can trigger tasks.
- Parameters:
valid_conditions – The list of valid conditions to record
- abstractmethod get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]¶
Get all currently valid conditions and their contexts.
- Returns:
Dictionary mapping condition IDs to their contexts
- abstractmethod clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]¶
Clear valid conditions after they have been processed.
- Parameters:
conditions – List of valid conditions to clear
- abstractmethod clean_task_trigger_definitions(task_id: pynenc.task.TaskId) None[source]¶
Remove all trigger definitions for a specific task.
This method should clean up any persisted trigger definitions associated with the task.
- Parameters:
task_id – ID of the task to clean triggers for
- register_task_triggers(task: pynenc.task.Task, triggers: TriggerBuilder | list[TriggerBuilder] | None = None) None[source]¶
Register triggers for a task.
This method processes trigger definitions created through a TriggerBuilder and registers them with the trigger system.
- Parameters:
task – The task to trigger
triggers – A TriggerBuilder or list of TriggerBuilders that define when the task should be triggered
- report_tasks_status(invocation_ids: list[pynenc.invocation.base_invocation.InvocationId], status: InvocationStatus | None = None) None[source]¶
Report status changes for multiple tasks to the trigger system in batch.
This method efficiently processes status changes for multiple invocations by batching context creation and condition validation.
- Parameters:
invocation_ids – List of invocation IDs reporting status changes
status – The new status for all invocations
- report_invocation_result(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, result: Any) None[source]¶
Report a task result to the trigger system.
This method checks if any conditions are watching for this task’s result, evaluates them, and records valid conditions.
- Parameters:
task_id – ID of the task reporting a result
result – The result produced by the task
invocation_id – ID of the specific invocation
- report_invocation_failure(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, exception: Exception) None[source]¶
Report a task exception to the trigger system.
This method checks if any conditions are watching for this task’s result, evaluates them, and records valid conditions.
- Parameters:
task_id – ID of the task reporting a result
exception – The exception raised by the task
invocation_id – ID of the specific invocation
- emit_event(event_code: str, payload: dict[str, Any]) str[source]¶
Emit an event into the system to potentially trigger tasks.
Creates an event context, evaluates event conditions, and records valid conditions for triggering tasks. The event is also persisted as an :class:
EventRecordfor monitoring even when no condition matches.- Parameters:
event_code – Type of the event
payload – Data associated with the event
- Returns:
ID of the created event
- _build_event_record(event_id: str, event_code: str, payload: dict[str, Any], timestamp: datetime.datetime, runner_context_id: str) pynenc.trigger.monitoring.EventRecord[source]¶
Build a fresh :class:
EventRecordfor the current emitter context.
- _safe_store_event(record: pynenc.trigger.monitoring.EventRecord, runner_context: pynenc.runner.runner_context.RunnerContext, log_template: str) None[source]¶
Persist
recordswallowing storage failures per the monitoring policy.Monitoring writes must never break the hot path. This helper centralizes the “log and continue” policy documented for the trigger component.
- _event_conditions_for(event_code: str) list[pynenc.trigger.conditions.EventCondition][source]¶
Return registered :class:
EventConditionobjects forevent_code.
- _current_emitter_invocation_id() str | None[source]¶
Return the invocation ID currently emitting an event, if any.
Reads
pynenc.context.get_dist_invocation_context(thread-local) which is populated byDistributedInvocation.runfor the duration of a task body. ReturnsNoneoutside any task.
- _current_emitter_context() tuple[str | None, str | None][source]¶
Return
(invocation_id, task_id_key)for the emitting task, if any.
- abstractmethod _get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]¶
Get all registered conditions.
- Returns:
List of all conditions
- abstractmethod get_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId) datetime.datetime | None[source]¶
Get the timestamp of the last execution of a cron condition from persistent storage.
- Parameters:
condition_id (str) – ID of the cron condition
- Returns:
Timestamp of last execution in UTC timezone, or None if never executed
- abstractmethod store_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]¶
Store the timestamp of the last execution of a cron condition in persistent storage.
This should be implemented as an atomic operation to prevent race conditions in distributed environments.
- Parameters:
condition_id (str) – ID of the cron condition
execution_time (datetime) – Timestamp of the execution (must be UTC timezone-aware)
expected_last_execution (datetime | None) – Expected current value for optimistic locking (UTC timezone-aware)
- Returns:
True if stored successfully, False if another process already updated it
- _should_trigger_cron_condition(condition: pynenc.trigger.conditions.CronCondition, current_time: datetime.datetime, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) pynenc.trigger.conditions.CronContext | None[source]¶
Determine if a cron condition should be triggered based on its schedule and last execution.
This method uses both local cache and persistent storage to efficiently determine if a cron condition should be triggered in a distributed environment. All datetime values are expected to be UTC timezone-aware.
- Parameters:
condition (CronCondition) – The cron condition to check
current_time (datetime) – Current time to check against (UTC timezone-aware)
atomic_service_run (AtomicServiceRun) – Atomic-service run evaluating cron.
- Returns:
CronContext if the condition is valid and should be triggered, None otherwise
- check_time_based_triggers(current_time: datetime.datetime | None = None, *, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]¶
Check and record valid time-based conditions.
- abstractmethod claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]¶
Atomically claim the right to execute a trigger run.
This prevents multiple workers from executing the same trigger multiple times for the same context.
- Parameters:
trigger_run_id – Unique ID for this trigger run
expiration_seconds – Number of seconds after which the claim expires
- Returns:
True if the claim was successful, False if another worker has claimed it
- trigger_loop_iteration(atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]¶
Process one iteration of the trigger loop.
- _process_loop_iteration(atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]¶
Body of
trigger_loop_iterationextracted for purge wrapping.
- _process_loop_iteration_body(atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]¶
Inner body of one trigger-loop iteration.
- _execution_contexts_for_trigger(trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, context: pynenc.trigger.trigger_context.TriggerContext) list[pynenc.trigger.trigger_context.TriggerContext][source]¶
Build the concrete contexts that should execute for
trigger.A trigger can receive several valid contexts for the same condition in one loop pass (for example four invocations of the same task reaching
SUCCESSbefore the trigger loop wakes up). Single-condition triggers must execute once per valid context, not once for the whole batch.OR triggers also execute independently per satisfied condition so their argument provider and trigger-run record see the exact source context. Multi-condition AND triggers keep the combined context because they need one valid condition for each required condition.
- execute_task(task_id: pynenc.task.TaskId, arguments: dict[str, Any] | None = None, *, parent_event_id: str | None = None) pynenc.invocation.base_invocation.BaseInvocation[source]¶
Execute a task with the given arguments.
This method handles the actual execution of a task when its trigger conditions are met.
- Parameters:
task_id – ID of the task to execute
arguments – Arguments to pass to the task, if any
parent_event_id – Optional id of the event that fired this trigger. Set on the routed invocation so the family tree can show event-children.
- _log_run_claimed(run_id: str, trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, execution_ctx: pynenc.trigger.trigger_context.TriggerContext, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]¶
Emit the predefined
trigger.run.claimedlog.Lists the participating condition IDs so the Log Explorer can cross-link back to the matched conditions without an extra query.
- _log_run_executed(run_id: str, trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, execution_ctx: pynenc.trigger.trigger_context.TriggerContext, invocation: pynenc.invocation.base_invocation.BaseInvocation, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]¶
Emit the predefined
trigger.run.executedlog.Includes the generated invocation plus any event/source-invocation participants so a single log line links every causal endpoint of the run for downstream Pynmon hydration.
- _record_trigger_run(*, run_id: str, trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, context: pynenc.trigger.trigger_context.TriggerContext, invocation: pynenc.invocation.base_invocation.BaseInvocation, claimed_at: datetime.datetime, executed_at: datetime.datetime, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) None[source]¶
Persist a :class:
TriggerRunRecordand link affected events.Failures are logged and never propagated; monitoring must not break the trigger loop. The event/invocation relation is recorded via the backend’s atomic
link_trigger_run_to_eventsinstead of a read-modify-write on the event JSON.
- _build_trigger_run_record(run_id: str, trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, context: pynenc.trigger.trigger_context.TriggerContext, invocation: pynenc.invocation.base_invocation.BaseInvocation, claimed_at: datetime.datetime, executed_at: datetime.datetime, atomic_service_run: pynenc.orchestrator.atomic_service.AtomicServiceRun) pynenc.trigger.monitoring.TriggerRunRecord[source]¶
Assemble a :class:
TriggerRunRecordfrom a satisfied trigger context.
- _monitoring_context_for_trigger_run(trigger: pynenc.trigger.trigger_definitions.TriggerDefinition, context: pynenc.trigger.trigger_context.TriggerContext) pynenc.trigger.trigger_context.TriggerContext[source]¶
Keep only the condition instances that explain this trigger run.
- _build_run_participants(context: pynenc.trigger.trigger_context.TriggerContext) list[pynenc.trigger.monitoring.TriggerRunParticipant][source]¶
Build a per-condition participant list from
context.Stores only reference ids plus a lightweight timestamp/summary so pynmon timeline overlays and tooltips can anchor and label the participant. Detail panels re-fetch the live condition / event / invocation when they need the full payload.
- abstractmethod store_event(event: pynenc.trigger.monitoring.EventRecord) None[source]¶
Store or replace an
EventRecord.
- abstractmethod get_event(event_id: str) EventRecord | None[source]¶
Return the
EventRecordforevent_id, orNone.
- abstractmethod get_events(*, event_code: str | None = None, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, matched: bool | None = None, triggered: bool | None = None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None, limit: int = 100, offset: int = 0) list[pynenc.trigger.monitoring.EventRecord][source]¶
Return events ordered by
timestampdescending.
- abstractmethod count_events(*, event_code: str | None = None, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, matched: bool | None = None, triggered: bool | None = None, emitted_by_invocation_id: str | None = None, emitted_by_task_id: str | None = None) int[source]¶
Count events with the same filters as
get_events.
- abstractmethod get_event_markers_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, *, event_code: str | None = None, state: str = 'all', limit: int = 1000, offset: int = 0) pynenc.trigger.monitoring.EventMarkerPage[source]¶
Return a paginated :class:
EventMarkerPagefor the time window.Use a narrow projection: id, code, timestamp, matched, triggered.
- Parameters:
state –
"all","matched","unmatched","triggered", or"untriggered".
- abstractmethod link_trigger_run_to_events(event_ids: list[str], invocation_id: str, *, trigger_run_id: str) None[source]¶
Link
invocation_idtoevent_ids.Backends should update event links atomically.
- abstractmethod get_invocations_triggered_by_event(event_id: str) list[str][source]¶
Return invocation IDs triggered by
event_id.
- abstractmethod store_trigger_run(run: pynenc.trigger.monitoring.TriggerRunRecord) None[source]¶
Store or replace a
TriggerRunRecord.
- abstractmethod get_trigger_run(trigger_run_id: str) TriggerRunRecord | None[source]¶
Return the
TriggerRunRecordfortrigger_run_id, orNone.
- abstractmethod get_trigger_runs_for_event(event_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]¶
Return trigger runs that reference
event_id.
- abstractmethod get_trigger_runs_for_invocation(invocation_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]¶
Return trigger runs that produced
invocation_id.
- get_trigger_runs_for_valid_condition(valid_condition_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]¶
Return trigger runs that include
valid_condition_id.Historical participant snapshots keep the match visible.
- abstractmethod get_trigger_runs_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, *, event_code: str | None = None, task_id_key: str | None = None, limit: int | None = None) list[pynenc.trigger.monitoring.TriggerRunRecord][source]¶
Return trigger runs between
start_timeandend_time.
- get_trigger_runs_sourced_by_invocation(invocation_id: str) list[pynenc.trigger.monitoring.TriggerRunRecord][source]¶
Return runs where
invocation_idwas a source participant.Backends with a source-invocation index should override this scan.
- get_events_batch(event_ids: collections.abc.Sequence[str]) dict[str, EventRecord | None][source]¶
Return
EventRecordvalues for each id, orNone.
- get_trigger_runs_batch(run_ids: collections.abc.Sequence[str]) dict[str, TriggerRunRecord | None][source]¶
Return
TriggerRunRecordvalues for each id, orNone.
- get_referenced_atomic_service_run_ids() set[str][source]¶
Return
atomic_service_run_idvalues referenced by trigger runs.Used to avoid purging linked execution records. Override with a dedicated index when available.
- get_conditions_batch(condition_ids: collections.abc.Sequence[str]) dict[str, TriggerCondition | None][source]¶
Return
TriggerConditionvalues for each id, orNone.
- get_triggers_batch(trigger_ids: collections.abc.Sequence[str]) dict[str, TriggerDefinition | None][source]¶
Return
TriggerDefinitionvalues for each id, orNone.
- get_events_emitted_by_invocation(invocation_id: str, *, limit: int = 200, offset: int = 0) list[pynenc.trigger.monitoring.EventRecord][source]¶
Return events emitted by
invocation_id.
- auto_purge_events() int[source]¶
Purge expired event and trigger-run records.
Returns
0when auto purge is disabled.
- abstractmethod _age_purge_events(threshold: datetime.datetime) list[str][source]¶
Delete events older than
thresholdand return their ids.
- abstractmethod _age_purge_trigger_runs(threshold: datetime.datetime) int[source]¶
Delete trigger runs older than
thresholdand return the count.
- abstractmethod _cascade_delete_runs_for_events(event_ids: list[str]) int[source]¶
Delete trigger runs that reference any of
event_ids.