Source code for pynenc.trigger.monitoring

"""
Monitoring DTOs for the trigger component.

These dataclasses are the durable monitoring contract that trigger backends
expose to any visibility / observability consumer. They follow the same
shape as ``InvocationHistory`` and other pynenc DTOs: backend-neutral, JSON
round-trippable, UTC-aware timestamps, and no dependency on any specific UI.

Module contents:

- :class:`EventRecord`: durable record of one emitted event.
- :class:`EventMarker`: lightweight projection used by timeline overlays.
- :class:`EventMarkerPage`: paginated marker projection with truncation flag.
- :class:`TriggerRunRecord`: durable record of one trigger run.
- :class:`TriggerRunParticipant`: per-condition row attached to a trigger run
  preserving which condition each event or source invocation satisfied.

Design notes:

- ``EventRecord.triggered_invocation_ids`` is rehydrated on read from the
  per-event invocation index rather than rewritten on the hot path.
- :class:`TriggerRunParticipant` stores only reference ids and lightweight
  labels. Detail views re-fetch the live ``TriggerCondition`` / context
  through :meth:`BaseTrigger.get_condition` and the event / state-backend
  lookups when they need full payloads.
"""

from __future__ import annotations

import json
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from pynenc.app import Pynenc


[docs] def _now_utc() -> datetime: """Return a UTC timezone-aware ``datetime`` for ``default_factory`` usage.""" return datetime.now(UTC)
# --------------------------------------------------------------------------- # # Events # --------------------------------------------------------------------------- #
[docs] @dataclass class EventRecord: """ Durable record of an emitted event. Created by :meth:`pynenc.trigger.base_trigger.BaseTrigger.emit_event` regardless of whether any condition matched. The first write captures the payload; a single follow-up write records matched/valid condition IDs when at least one condition accepts the event. ``triggered_invocation_ids`` is populated by the backend on read from a dedicated event->invocation index. The hot path does not rewrite the event JSON to append invocation IDs. """ event_id: str event_code: str payload: dict[str, Any] = field(default_factory=dict) timestamp: datetime = field(default_factory=_now_utc) matched_condition_ids: list[str] = field(default_factory=list) valid_condition_ids: list[str] = field(default_factory=list) triggered_invocation_ids: list[str] = field(default_factory=list) emitted_by_invocation_id: str | None = None emitted_by_task_id: str | None = None emitted_by_runner_context_id: str | None = None @property def matched(self) -> bool: """True when at least one event condition accepted this event.""" return bool(self.matched_condition_ids) @property def triggered(self) -> bool: """True when at least one invocation was created from this event.""" return bool(self.triggered_invocation_ids)
[docs] def to_json(self, app: Pynenc) -> str: """Serialize to JSON, routing payload values through the data store.""" return json.dumps(self._to_dict(app))
[docs] def _to_dict(self, app: Pynenc) -> dict[str, Any]: """Return a JSON-ready dict using ``app.client_data_store`` for payload.""" payload_data: dict[str, Any] = { key: app.client_data_store.serialize(value, False) for key, value in self.payload.items() } return { "event_id": self.event_id, "event_code": self.event_code, "timestamp": self.timestamp.isoformat(), "payload": payload_data, "matched_condition_ids": list(self.matched_condition_ids), "valid_condition_ids": list(self.valid_condition_ids), "triggered_invocation_ids": list(self.triggered_invocation_ids), "emitted_by_invocation_id": self.emitted_by_invocation_id, "emitted_by_task_id": self.emitted_by_task_id, "emitted_by_runner_context_id": self.emitted_by_runner_context_id, }
[docs] @classmethod def from_json(cls, json_str: str, app: Pynenc) -> EventRecord: """Rebuild an :class:`EventRecord` from its JSON representation.""" return cls._from_dict(json.loads(json_str), app)
[docs] @classmethod def _from_dict(cls, data: dict[str, Any], app: Pynenc) -> EventRecord: payload = { key: app.client_data_store.deserialize(value) for key, value in (data.get("payload") or {}).items() } emitted_by = data.get("emitted_by_invocation_id") return cls( event_id=data["event_id"], event_code=data["event_code"], payload=payload, timestamp=_parse_timestamp(data.get("timestamp")), matched_condition_ids=list(data.get("matched_condition_ids") or []), valid_condition_ids=list(data.get("valid_condition_ids") or []), triggered_invocation_ids=list(data.get("triggered_invocation_ids") or []), emitted_by_invocation_id=emitted_by, emitted_by_task_id=data.get("emitted_by_task_id"), emitted_by_runner_context_id=data.get("emitted_by_runner_context_id"), )
[docs] @dataclass(frozen=True) class EventMarker: """ Lightweight projection of an event used by the timeline overlay. Backends populate this directly from indexed columns without deserializing the full event payload. The marker is intentionally narrow so a busy timeline window can load many markers cheaply. """ event_id: str event_code: str timestamp: datetime matched: bool triggered: bool emitted_by_invocation_id: str | None = None emitted_by_runner_context_id: str | None = None
[docs] @dataclass(frozen=True) class EventMarkerPage: """ Paginated event-marker projection with explicit truncation metadata. ``truncated`` is ``True`` when the backend stopped at ``limit`` and more markers exist for the requested window. Pynmon uses this to surface a visible "showing N of more" hint instead of silently dropping markers. """ markers: list[EventMarker] total: int truncated: bool
# --------------------------------------------------------------------------- # # Trigger runs # --------------------------------------------------------------------------- #
[docs] @dataclass(frozen=True) class TriggerRunParticipant: """ Per-condition row attached to a :class:`TriggerRunRecord`. Preserves the mapping between a valid condition and the event or source invocation that satisfied it. Composite (AND) triggers need this to answer "which condition did each participant satisfy?". Exactly one of ``event_id`` / ``source_invocation_id`` is populated for event/status/result/exception conditions. ``CronContext`` and other contexts without a discrete source populate neither. ``context_timestamp`` and ``context_summary`` are lightweight labels so the timeline overlay and tooltip rows can anchor and describe a participant without re-fetching the source record. Full condition and context payloads are re-fetched on demand from the trigger registry and the event / state-backend lookups when a detail panel is opened. """ context_type: str condition_id: str | None = None valid_condition_id: str | None = None event_id: str | None = None source_invocation_id: str | None = None context_timestamp: datetime | None = None context_summary: str = ""
[docs] def to_dict(self) -> dict[str, Any]: return { "context_type": self.context_type, "condition_id": self.condition_id, "valid_condition_id": self.valid_condition_id, "event_id": self.event_id, "source_invocation_id": self.source_invocation_id, "context_timestamp": _iso_or_none(self.context_timestamp), "context_summary": self.context_summary, }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> TriggerRunParticipant: return cls( context_type=data["context_type"], condition_id=data.get("condition_id"), valid_condition_id=data.get("valid_condition_id"), event_id=data.get("event_id"), source_invocation_id=data.get("source_invocation_id"), context_timestamp=_parse_timestamp_optional(data.get("context_timestamp")), context_summary=data.get("context_summary") or "", )
[docs] @dataclass class TriggerRunRecord: """ Durable record of a single trigger run. Captures the participants that satisfied the trigger and the resulting invocation. ``participants`` preserves the per-condition mapping that the parallel ``event_ids`` / ``source_invocation_ids`` lists lose. The parallel lists are kept for backward compatibility and as a flat index for backends that do not need the per-condition mapping. """ trigger_run_id: str trigger_id: str task_id_key: str logic_value: str valid_condition_ids: list[str] = field(default_factory=list) condition_ids: list[str] = field(default_factory=list) event_ids: list[str] = field(default_factory=list) source_invocation_ids: list[str] = field(default_factory=list) triggered_invocation_id: str | None = None arguments_preview: dict[str, Any] = field(default_factory=dict) claimed_at: datetime | None = None executed_at: datetime | None = None participants: list[TriggerRunParticipant] = field(default_factory=list) # ID of the atomic-service execution cycle that produced this trigger run # (set when the trigger loop iteration runs inside ``_check_atomic_services``). # ``None`` for trigger runs produced outside the atomic-service path # (e.g. status / event / result-driven triggers from a task). atomic_service_run_id: str | None = None atomic_service_runner_id: str | None = None
[docs] def to_json(self) -> str: """Serialize to JSON. Arguments preview is stored as plain JSON.""" return json.dumps( { "trigger_run_id": self.trigger_run_id, "trigger_id": self.trigger_id, "task_id_key": self.task_id_key, "logic_value": self.logic_value, "valid_condition_ids": list(self.valid_condition_ids), "condition_ids": list(self.condition_ids), "event_ids": list(self.event_ids), "source_invocation_ids": list(self.source_invocation_ids), "triggered_invocation_id": self.triggered_invocation_id, "arguments_preview": self.arguments_preview, "claimed_at": _iso_or_none(self.claimed_at), "executed_at": _iso_or_none(self.executed_at), "participants": [p.to_dict() for p in self.participants], "atomic_service_run_id": self.atomic_service_run_id, "atomic_service_runner_id": self.atomic_service_runner_id, } )
[docs] @classmethod def from_json(cls, json_str: str) -> TriggerRunRecord: """Rebuild a :class:`TriggerRunRecord` from its JSON representation.""" data = json.loads(json_str) return cls( trigger_run_id=data["trigger_run_id"], trigger_id=data["trigger_id"], task_id_key=data["task_id_key"], logic_value=data["logic_value"], valid_condition_ids=list(data.get("valid_condition_ids") or []), condition_ids=list(data.get("condition_ids") or []), event_ids=list(data.get("event_ids") or []), source_invocation_ids=list(data.get("source_invocation_ids") or []), triggered_invocation_id=data.get("triggered_invocation_id"), arguments_preview=dict(data.get("arguments_preview") or {}), claimed_at=_parse_timestamp_optional(data.get("claimed_at")), executed_at=_parse_timestamp_optional(data.get("executed_at")), participants=[ TriggerRunParticipant.from_dict(p) for p in data.get("participants") or [] ], atomic_service_run_id=data.get("atomic_service_run_id"), atomic_service_runner_id=data.get("atomic_service_runner_id"), )
# --------------------------------------------------------------------------- # # Internal helpers # --------------------------------------------------------------------------- #
[docs] def _parse_timestamp(value: Any) -> datetime: """Parse an ISO timestamp string, defaulting to "now" when missing.""" if not value: return _now_utc() parsed = datetime.fromisoformat(value) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=UTC) return parsed
[docs] def _parse_timestamp_optional(value: Any) -> datetime | None: """Parse an ISO timestamp string into UTC, returning ``None`` when missing.""" if not value: return None return _parse_timestamp(value)
[docs] def _iso_or_none(value: datetime | None) -> str | None: """Return ``value.isoformat()`` or ``None``.""" return value.isoformat() if value is not None else None