Source code for pynenc.trigger.trigger_builder

"""
Builder classes for constructing trigger definitions.

This module provides a fluent interface for building trigger definitions,
making it easier to define complex triggering conditions for tasks.
"""

from collections.abc import Callable
from typing import TYPE_CHECKING, Any, TypeVar, cast

from pynenc.invocation.status import InvocationStatus
from pynenc.trigger.arguments.argument_filters import create_argument_filter
from pynenc.trigger.arguments.argument_providers import (
    ArgumentProvider,
    CompositeArgumentProvider,
    ContextCallable,
    ContextTypeArgumentProvider,
    DirectArgumentProvider,
    StaticArgumentProvider,
)
from pynenc.trigger.arguments.result_filter import NoResultFilter, create_result_filter
from pynenc.trigger.conditions import (
    CompositeLogic,
    CronCondition,
    EventCondition,
    EventContext,
    StatusCondition,
    StatusContext,
)
from pynenc.trigger.conditions.base import ConditionContext
from pynenc.trigger.conditions.exception import ExceptionCondition, ExceptionContext
from pynenc.trigger.conditions.result import ResultCondition, ResultContext
from pynenc.trigger.trigger_definitions import TriggerDefinition

if TYPE_CHECKING:
    from pynenc.task import Task, TaskId
    from pynenc.trigger.conditions.base import TriggerCondition
    from pynenc.trigger.trigger_context import TriggerContext

C = TypeVar("C", bound="ConditionContext")


[docs] class TriggerBuilder: """ Builder class for creating trigger definitions. Provides a fluent interface for defining when and how tasks should be triggered, with methods for different trigger types and condition combinations. """ def __init__(self) -> None: """Initialize an empty trigger builder.""" self.conditions: list[TriggerCondition] = [] self.logic: CompositeLogic = CompositeLogic.AND self.argument_providers: list[ArgumentProvider] = []
[docs] def on_cron(self, cron_expression: str) -> "TriggerBuilder": """ Add a cron-based condition to trigger the task. :param cron_expression: Standard cron expression (e.g., "0 0 * * *" for daily at midnight) :return: This builder for method chaining """ self.conditions.append(CronCondition(cron_expression)) return self
[docs] def on_event( self, event_code: str, payload_filter: dict[str, Any] | Callable[[dict[str, Any]], bool] | None = None, ) -> "TriggerBuilder": """ Add an event-based condition to trigger the task. :param event_code: Type of event to listen for :param payload_filter: Optional payload arguments to match specific events or boolean callable :return: This builder for method chaining """ self.conditions.append( EventCondition(event_code, create_argument_filter(payload_filter)) ) return self
[docs] def on_status( self, task: "Task", statuses: str | list[str] | InvocationStatus | list[InvocationStatus] = InvocationStatus.SUCCESS, call_arguments: dict[str, Any] | Callable[[dict[str, Any]], bool] | None = None, ) -> "TriggerBuilder": """ Add a task status condition to trigger after another task completes. :param task: The task to monitor :param statuses: Status(es) that trigger execution (default: InvocationStatus.SUCCESS) :param call_arguments: Optional arguments to match specific calls :return: This builder for method chaining """ # Handle single status as list to standardize if not isinstance(statuses, list): statuses = [statuses] _statuses: list[InvocationStatus] = [] for status in statuses: if isinstance(status, str): status = InvocationStatus(status.lower()) _statuses.append(status) self.conditions.append( StatusCondition( task.task_id, _statuses, create_argument_filter(call_arguments) ) ) return self
[docs] def on_any_result( self, task: "Task", call_arguments: dict[str, Any] | Callable[[dict[str, Any]], bool] | None = None, ) -> "TriggerBuilder": """ Add a result condition to trigger when a task finishes successfully. :param task: The task to monitor for results :param call_arguments: Optional filter for task arguments to match specific invocations :return: This builder for method chaining """ self.conditions.append( ResultCondition( task.task_id, create_argument_filter(call_arguments), NoResultFilter(), ) ) return self
[docs] def on_result( self, task: "Task", filter_result: Any | Callable[[Any], bool], call_arguments: dict[str, Any] | Callable[[dict[str, Any]], bool] | None = None, ) -> "TriggerBuilder": """ Add a result condition to trigger when a task returns a specific result or matches a filter. This method allows defining conditions that trigger based on task results in several ways: - Exact matching: Pass any value (including None) to match exactly - Custom filtering: Pass a function that evaluates the result and returns a boolean - Ignore results: Set ignore_result=True to accept any result value :param task: The task to monitor for results :param filter_result: Either a specific value to match or a callable that filters results :param call_arguments: Optional filter for task arguments to match specific invocations :return: This builder for method chaining """ self.conditions.append( ResultCondition( task.task_id, create_argument_filter(call_arguments), create_result_filter(filter_result), ) ) return self
[docs] def on_exception( self, task: "Task", exception_types: list[str] | str | None = None, call_arguments: dict[str, Any] | Callable[[dict[str, Any]], bool] | None = None, ) -> "TriggerBuilder": """ Add an exception condition to trigger when a task fails with specific exceptions. This method creates a condition that triggers when a monitored task fails with a specific exception type or any exception if none specified. :param task: The task to monitor for exceptions :param exception_types: Exception type name(s) to match, None to match any exception :param call_arguments: Optional filter for task arguments to match specific invocations :return: This builder for method chaining """ if exception_types is None: exception_list = [] elif isinstance(exception_types, str): exception_list = [exception_types] else: exception_list = exception_types self.conditions.append( ExceptionCondition( task.task_id, create_argument_filter(call_arguments), exception_list, ) ) return self
[docs] def with_logic(self, logic: CompositeLogic | str) -> "TriggerBuilder": """ Set the logic for combining multiple conditions. :param logic: Logic to use (AND or OR) :return: This builder for method chaining """ if isinstance(logic, str): logic = CompositeLogic(logic.lower()) self.logic = logic return self
[docs] def with_arguments(self, args: dict[str, Any]) -> "TriggerBuilder": """ Set static arguments to pass to the task when triggered. This is a legacy method - prefer with_args_static for new code. :param args: Dictionary of static arguments to pass to the task :return: This builder for method chaining """ return self.with_args_static(args)
[docs] def with_args_static(self, args: dict[str, Any]) -> "TriggerBuilder": """ Set static arguments to pass to the task when triggered. :param args: Dictionary of static arguments to pass to the task :return: This builder for method chaining """ self.argument_providers.append(StaticArgumentProvider(args)) return self
[docs] def with_args_from_event( self, callback: Callable[[EventContext], dict[str, Any]], ) -> "TriggerBuilder": """ Define arguments based on the event context that triggered the task. :param callback: Function that receives an EventContext and returns arguments :return: This builder for method chaining """ self.argument_providers.append( ContextTypeArgumentProvider( EventContext, cast(ContextCallable[EventContext], callback) ) ) return self
[docs] def with_args_from_status( self, callback: Callable[[StatusContext], dict[str, Any]], ) -> "TriggerBuilder": """ Define arguments based on the task status context that triggered this task. :param callback: Function that receives a StatusContext and returns arguments :return: This builder for method chaining """ self.argument_providers.append( ContextTypeArgumentProvider( StatusContext, cast(ContextCallable[StatusContext], callback) ) ) return self
[docs] def with_args_from_result( self, callback: Callable[[ResultContext], dict[str, Any]], ) -> "TriggerBuilder": """ Define arguments based on the result context that triggered this task. :param callback: Function that receives a ResultContext and returns arguments :return: This builder for method chaining """ self.argument_providers.append( ContextTypeArgumentProvider( ResultContext, cast(ContextCallable[ResultContext], callback) ) ) return self
[docs] def with_args_from_exception( self, callback: Callable[[ExceptionContext], dict[str, Any]], ) -> "TriggerBuilder": """ Define arguments based on the exception context that triggered this task. The context type is inferred from the callback's type annotation. :param callback: Function that receives a specific ConditionContext and returns arguments :return: This builder for method chaining """ # Create a provider that knows what context type to use based on the callback type self.argument_providers.append( ContextTypeArgumentProvider( ExceptionContext, cast(ContextCallable[ExceptionContext], callback) ) ) return self
[docs] def with_args_from_trigger_context( self, callback: Callable[["TriggerContext"], dict[str, Any]], ) -> "TriggerBuilder": """ Define arguments with direct access to the trigger context. Provides maximum flexibility by giving direct access to all valid conditions. :param callback: Function that receives the TriggerContext and returns arguments :param fallback: If True, ignores failures and continues with other providers :return: This builder for method chaining """ self.argument_providers.append(DirectArgumentProvider(callback)) return self
[docs] def with_args_provider(self, provider: ArgumentProvider) -> "TriggerBuilder": """ Add a custom argument provider. :param provider: The argument provider to add :return: This builder for method chaining """ self.argument_providers.append(provider) return self
[docs] def add_condition(self, condition: "TriggerCondition") -> "TriggerBuilder": """ Add a custom condition to the builder. :param condition: The condition to add :return: This builder for method chaining """ self.conditions.append(condition) return self
[docs] def build(self, task_id: "TaskId") -> TriggerDefinition: """ Build the trigger definition for a task. :param task_id: ID of the task to trigger :return: A trigger definition with the configured conditions :raises ValueError: If no conditions are defined """ # Handle case with no conditions if not self.conditions: raise ValueError("Cannot create a trigger definition with no conditions") # Extract condition IDs from conditions condition_ids = [condition.condition_id for condition in self.conditions] # Create a composite provider if there are multiple providers argument_provider: ArgumentProvider | None = None if len(self.argument_providers) > 1: argument_provider = CompositeArgumentProvider(self.argument_providers) elif len(self.argument_providers) == 1: argument_provider = self.argument_providers[0] # If no providers, a default empty provider will be created in TriggerDefinition # Create and return the trigger definition return TriggerDefinition( task_id=task_id, condition_ids=condition_ids, logic=self.logic, argument_provider=argument_provider, )
# Define helper functions for common trigger patterns
[docs] def on_cron(cron_expression: str) -> TriggerBuilder: """ Create a trigger builder for a cron schedule. :param cron_expression: Standard cron expression :return: A trigger builder with the cron condition """ return TriggerBuilder().on_cron(cron_expression)
[docs] def on_event( event_code: str, required_params: dict[str, Any] | None = None ) -> TriggerBuilder: """ Create a trigger builder for an event. :param event_code: Type of event to listen for :param required_params: Optional parameters that must be present in the event payload :return: A trigger builder with the event condition """ return TriggerBuilder().on_event(event_code, required_params)
[docs] def on_status( task: "Task", statuses: str | list[str] | InvocationStatus | list[InvocationStatus] = InvocationStatus.SUCCESS, call_arguments: dict[str, Any] | None = None, ) -> TriggerBuilder: """ Create a trigger builder for a task status change. :param task: The task to monitor :param statuses: Status(es) that trigger execution (default: InvocationStatus.SUCCESS) :param call_arguments: Optional arguments to match specific calls :return: A trigger builder with the task status condition """ return TriggerBuilder().on_status(task, statuses, call_arguments)