Source code for pynenc.trigger.conditions.status

"""
Status-based trigger conditions for Pynenc.

This module provides conditions that trigger based on task and call status changes,
allowing tasks to be triggered when other tasks reach specific states.
"""

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, ClassVar

from pynenc.arguments import Arguments
from pynenc.identifiers.call_id import CallId
from pynenc.identifiers.invocation_id import InvocationId
from pynenc.identifiers.task_id import TaskId
from pynenc.invocation.status import InvocationStatus
from pynenc.trigger.arguments import ArgumentFilter
from pynenc.trigger.conditions.base import ConditionContext, TriggerCondition

if TYPE_CHECKING:
    from pynenc.invocation.dist_invocation import DistributedInvocation

    from ...app import Pynenc


[docs] @dataclass class StatusContext(ConditionContext): """ Context for task status conditions. Contains the task ID, call ID, invocation ID, status, and call arguments to evaluate status-based conditions. """ call_id: "CallId" invocation_id: "InvocationId" arguments: Arguments status: InvocationStatus disable_cache_args: tuple[str, ...] @property def context_id(self) -> str: return f"status_{self.invocation_id}_{self.status}"
[docs] @classmethod def from_invocation( cls, invocation: "DistributedInvocation", status: InvocationStatus | None = None, ) -> "StatusContext": """ Create a StatusContext from a DistInvocation. :param invocation: The invocation to extract context from :param status: Optional status to override the invocation's status :return: A StatusContext with call ID, invocation ID, status, and call arguments """ return cls( call_id=invocation.call.call_id, invocation_id=invocation.invocation_id, status=status or invocation.status, arguments=invocation.call.arguments, disable_cache_args=invocation.call.task.conf.disable_cache_args, )
[docs] def _to_json(self, app: "Pynenc") -> dict[str, Any]: """ Create a serializable representation of this status context. :param app: Pynenc application instance :return: Dictionary with serialized context data """ serialized_args = app.client_data_store.serialize_arguments( self.arguments.kwargs, self.disable_cache_args ) return { "call_id_key": self.call_id.key, "invocation_id": self.invocation_id, "status_value": self.status.value, "arguments_json": serialized_args, "disable_cache_args": list(self.disable_cache_args), }
[docs] @classmethod def _from_json(cls, data: dict[str, Any], app: "Pynenc") -> "StatusContext": """ Create a StatusContext from parsed JSON data. :param data: Dictionary with context data :param app: Pynenc application instance :return: A new StatusContext instance """ deserialized_args = app.client_data_store.deserialize_arguments( data["arguments_json"] ) return cls( call_id=CallId.from_key(data["call_id_key"]), invocation_id=InvocationId(data["invocation_id"]), status=InvocationStatus(data["status_value"]), arguments=Arguments(deserialized_args), disable_cache_args=tuple(data["disable_cache_args"]), )
[docs] class StatusCondition(TriggerCondition[StatusContext]): """ Condition based on task status changes. Triggers when a task reaches a specific status, with optional filtering by call arguments. """ context_type: ClassVar[type[StatusContext]] = StatusContext def __init__( self, task_id: "TaskId", statuses: list[InvocationStatus], arguments_filter: ArgumentFilter, ): """ Create a task status trigger condition. :param task_id: ID of the task to monitor :param statuses: Status(es) that satisfy this condition :param arguments_filter: Optional filter for task call arguments """ self.task_id = task_id self.statuses = statuses self.arguments_filter = arguments_filter
[docs] def get_source_task_ids(self) -> set["TaskId"]: return {self.task_id}
@property def condition_id(self) -> str: """ Generate a unique ID for this status condition. :return: A string ID based on task ID, statuses and call arguments """ statuses_str = "_".join(sorted(self.statuses)) base_id = ( f"condition#{self.task_id}#{statuses_str}#{self.arguments_filter.filter_id}" ) return base_id
[docs] def _to_json(self, app: "Pynenc") -> dict[str, Any]: """ Create a serializable representation of this condition. :param app: Pynenc application instance for serializing complex arguments :return: Dictionary with serialized condition data """ data: dict = { "task_id": self.task_id.key, "statuses": self.statuses, "arguments_filter": self.arguments_filter.to_json(app), } return data
[docs] @classmethod def _from_json(cls, data: dict[str, Any], app: "Pynenc") -> "StatusCondition": """ Create a StatusCondition from parsed JSON data. :param data: Dictionary with condition data :param app: Pynenc application instance for deserializing complex arguments :return: A new StatusCondition instance :raises ValueError: If the data is invalid for this condition type """ task_id = TaskId.from_key(data["task_id"]) statuses = data.get("statuses", []) if not statuses: raise ValueError("Missing required statuses in StatusCondition data") arguments_filter = ArgumentFilter.from_json(data["arguments_filter"], app) statuses = [InvocationStatus(status) for status in statuses] return cls( task_id=task_id, statuses=statuses, arguments_filter=arguments_filter )
[docs] def _is_satisfied_by(self, context: StatusContext) -> bool: """ Check if a task status change satisfies this condition. A status change satisfies the condition when: 1. The task ID matches the condition's monitored task 2. The status is in the list of monitored statuses 3. The arguments filter matches the call arguments :param context: Status context with task ID, status, and call arguments :return: True if the condition is satisfied """ if context.call_id.task_id != self.task_id: return False if context.status not in self.statuses: return False return self.arguments_filter.filter_arguments(context.arguments.kwargs)
[docs] def affects_task(self, task_id: "TaskId") -> bool: """ Check if this condition is affected by a specific task. :param task_id: ID of the task to check :return: True if this condition watches the specified task """ return self.task_id == task_id