Source code for pynenc.runner.base_runner

import asyncio
from datetime import UTC, datetime
import importlib
from logging import Logger
import os
import signal
import socket
import threading
import time
import warnings
from abc import ABC, abstractmethod
from functools import cached_property
from typing import TYPE_CHECKING, Any

from pynenc import context
from pynenc.conf.config_runner import ConfigRunner
from pynenc.exceptions import (
    InvocationStatusError,
    InvocationStatusTransitionError,
    RunnerNotExecutableError,
    PynencError,
)
from pynenc.invocation.status import InvocationStatus
from pynenc.runner.runner_context import RunnerContext
from pynenc.runner.shutdown_diagnostics import classify_signal, log_runner_shutdown

if TYPE_CHECKING:
    from types import FrameType

    from pynenc.app import Pynenc
    from pynenc.identifiers.invocation_id import InvocationId


[docs] class BaseRunner(ABC): """ The BaseRunner class defines the interface for a runner that executes task invocations. It interacts with various components of the Pynenc system, like the broker and orchestrator, and is responsible for handling the execution and life cycle of task invocations. - The runner's behavior can vary depending on the execution environment (e.g., subprocess, async, cloud function, multiprocessing). - It is designed to be subclassed for specific execution environments. """ """ The Runner will execute invocations from the broker. - It requires an app because it needs to know about the broker, orchestrator, etc. - The runner will affect the behavior of the task result, for example: * In a subprocess environment, it may implement a pipe to communicate for pausing/resuming processes. * In an async environment, the value should be an async function to wait for distributed results. * In a cloud function environment aiming for speed with a single thread, it might not wait more than 'x' seconds and instead, create a 'callback', save the status, and convert the current execution into a task that will be called when the result is ready. * In a multiprocessing environment in a Kubernetes pod with capabilities to create new pods, it may have different behaviors. * For an asyncio worker, it runs several tasks in one processor, and the value should wait with async. """ def __init__( self, app: "Pynenc", runner_cache: dict | None = None, runner_context: RunnerContext | None = None, ) -> None: # Initialize instance attributes first, before making runner accessible self.running = False self._shutdown_signum: int | None = None self._runner_cache = runner_cache self._runner_context = runner_context or RunnerContext(self.__class__.__name__) # Signalled when on_stop() completes so callers can wait for the run() loop to fully exit self._run_stopped = threading.Event() self._run_stopped.set() # initially not running, so already "stopped" # Set app relationship last, after all attributes are initialized self.app = app self.app.runner = self self._last_atomic_service_check_time = 0.0
[docs] def __getstate__(self) -> dict: state = self.__dict__.copy() state.pop("_run_stopped", None) return state
[docs] def __setstate__(self, state: dict) -> None: self.__dict__.update(state) self._run_stopped = threading.Event() self._run_stopped.set()
@property def logger(self) -> Logger: return self.app.logger @property def runner_id(self) -> str: """Unique identifier for the runner instance.""" return self._runner_context.runner_id @property def runner_context(self) -> RunnerContext: """The RunnerContext associated with this runner.""" return self._runner_context @cached_property def conf(self) -> ConfigRunner: return ConfigRunner( config_values=self.app.config_values, config_filepath=self.app.config_filepath, )
[docs] @staticmethod @abstractmethod def mem_compatible() -> bool: """ Indicates if the runner is compatible with in-memory components. ```{important} In memory components can only be used for testing purposes in shared memory space. ``` :return: True if compatible, False otherwise """ ...
@property @abstractmethod def max_parallel_slots(self) -> int: """ The maximum number of parallel tasks that the runner can handle. :return: An integer representing the maximum number of parallel tasks """
[docs] @abstractmethod def get_active_child_runner_ids(self) -> list[str]: """ Returns the list of currently active child runner IDs. This method enables parent-based health reporting for runners that spawn child processes or workers. The parent can report which child runner_ids are still alive based on OS-level process checks (e.g., Process.is_alive()). The orchestrator uses this to register heartbeats for active children, avoiding false recovery of invocations owned by children that are still running. Runners that spawn child processes must return the runner_ids of their alive children. Runners that don't spawn children must return an empty list. :return: List of runner_ids for currently active child workers. """
[docs] @abstractmethod def _on_start(self) -> None: """This method is called when the runner starts"""
[docs] def on_start(self) -> None: """This method is called when the runner starts""" self.app.logger.info( f"Starting {self.__class__.__name__} runner:{self.runner_id}" ) if threading.current_thread() is threading.main_thread(): signal.signal(signal.SIGINT, self.stop_runner_loop) signal.signal(signal.SIGTERM, self.stop_runner_loop) else: warnings.warn( "Running in a secondary thread. Signal handling will be skipped.", stacklevel=2, ) self.init_trigger_tasks_modules() self.running = True self._run_stopped.clear() # run() is starting; mark as not-yet-stopped self._on_start()
[docs] @abstractmethod def _on_stop(self) -> None: """This method is called when the runner stops"""
[docs] def on_stop(self) -> None: """This method is called when the runner stops""" self.app.logger.info( f"STOPPING {self.__class__.__name__} runner:{self.runner_id}..." ) self.running = False self._on_stop() self.app.logger.info( f"STOPPED {self.__class__.__name__} runner:{self.runner_id}" ) self._run_stopped.set() # signal that run() has fully exited
[docs] def wait_until_stopped(self, timeout: float = 2.0) -> bool: """Block until the runner's run() loop has fully exited. Safe to call even if run() was never started (returns immediately). :param float timeout: Maximum seconds to wait. :return: True if stopped within the timeout, False otherwise. """ return self._run_stopped.wait(timeout=timeout)
[docs] @abstractmethod def runner_loop_iteration(self) -> None: """ One iteration of the runner loop. Subclasses should implement this method to process invocations. """
[docs] @abstractmethod def _on_stop_runner_loop(self) -> None: """This method is called after the runner loop signal is received"""
[docs] def _kill_and_reroute( self, invocation_id: Any, runner_ctx: "RunnerContext | None" = None ) -> None: """ Mark an invocation as KILLED and reroute it for retry. Silently ignores if the invocation already reached a final status (it completed before we killed it — nothing to reroute). :param Any invocation_id: Invocation to kill and reroute :param RunnerContext | None runner_ctx: Runner context to use for ownership; defaults to self.runner_context. Pass the child's context when the parent is acting on behalf of a child (e.g. ProcessRunner killing a worker). """ ctx = runner_ctx or self.runner_context try: self.app.orchestrator.set_invocation_status( invocation_id, InvocationStatus.KILLED, ctx ) self.app.orchestrator.reroute_invocations({invocation_id}, ctx) self.logger.info(f"Rerouted invocation:{invocation_id} after kill") except InvocationStatusTransitionError as e: if e.from_status and e.from_status.is_final(): self.logger.debug( f"invocation:{invocation_id} already in final status, no rerouting needed" ) else: self.logger.warning( f"Could not reroute invocation:{invocation_id}: {e}" ) except InvocationStatusError as e: self.logger.warning( f"Not possible to set invocation:{invocation_id} to KILLED and reroute: {e}" )
[docs] def _log_shutdown(self, signum: int | None) -> None: """ Log diagnostics when the runner receives a shutdown signal. Override in subclasses to include active processes/threads/invocations. The default logs only the runner identity and system environment. """ log_runner_shutdown( self.app.logger, self.__class__.__name__, self.runner_id, signum )
[docs] def stop_runner_loop( self, signum: int | None = None, frame: "FrameType | None" = None ) -> None: """ Stops the runner loop, typically in response to a signal. Logs shutdown diagnostics (including system env, running workers, and invocation state) before stopping. Critical for debugging OOM events. :param int | None signum: Signal number :param FrameType | None frame: Frame object at the time the signal was received """ self._shutdown_signum = signum reason = classify_signal(signum) self.app.logger.warning( f"Received signal:{signum} reason:{reason} Stopping {self.__class__.__name__} loop..." ) try: self._log_shutdown(signum) except Exception as e: self.app.logger.error( f"Failed to collect shutdown diagnostics: {e}", exc_info=True ) self.running = False self._on_stop_runner_loop()
[docs] @abstractmethod def _waiting_for_results( self, running_invocation_id: "InvocationId", result_invocation_ids: list["InvocationId"], runner_args: dict[str, Any] | None = None, ) -> None: """ Method called when an invocation is waiting for results from other invocations. ```{note} This method is called from the result method of an invocation ``` The runner has the oportunity to define the waiting behaviour of the running invocation in this method Otherwise the running invocation will infinetely loop until the result invocation is ready ```{note} The running invocation may be None, when the result was called from outside a runner (e.g. user environment) In that case will be handle by the DummyRunner (default in the pynenc app to handle this cases) ``` Subclasses can define the waiting behavior of the running invocation in this method. :param str running_invocation_id: The ID of the invocation that is waiting for results :param list[str] result_invocation_ids: A list of IDs of the invocations whose results are being awaited :param dict[str, Any] | None runner_args: Additional arguments passed to the runner, specific to the runner's implementation """
[docs] def waiting_for_results( self, running_invocation_id: "InvocationId | None", result_invocation_ids: list["InvocationId"], runner_args: dict[str, Any] | None = None, ) -> None: """ Handles invocations that are waiting for results from other invocations. Pauses the current thread and registers it to wait for the results of specified invocations. :param InvocationId | None running_invocation_id: The ID of the invocation that is waiting for results :param list[InvocationId] result_invocation_ids: A list of IDs of the invocations whose results are being awaited :param dict[str, Any] | None runner_args: Additional arguments required for the ThreadRunner """ if not running_invocation_id: # running from outside this runner (user instantiate an app with this runner class, # but ask for an invocation result outside of the runner processes) self.app.logger.debug( f"Waiting for invocations:{result_invocation_ids} from outside this runner" ) time.sleep(self.conf.invocation_wait_results_sleep_time_sec) return self._waiting_for_results( running_invocation_id, result_invocation_ids, runner_args )
[docs] async def async_waiting_for_results( self, running_invocation_id: "InvocationId | None", result_invocation_ids: list["InvocationId"], runner_args: dict[str, Any] | None = None, ) -> None: if not running_invocation_id: # running from outside this runner (user instantiate an app with this runner class, # but ask for an invocation result outside of the runner processes) self.logger.debug( f"Async Waiting for invocations:{result_invocation_ids} from outside this runner" ) await asyncio.sleep(self.conf.invocation_wait_results_sleep_time_sec) return self._waiting_for_results( running_invocation_id, result_invocation_ids, runner_args )
[docs] def init_trigger_tasks_modules(self) -> None: """Initialize trigger task modules and register trigger tasks.""" modules = getattr(self.app.conf, "trigger_task_modules", None) or () if modules: self.app.logger.info("Initializing trigger task modules:") for module_name in modules: msg = f"trigger module '{module_name}'" try: importlib.import_module(module_name) self.app.logger.info(f" - Successfully imported {msg}") except ImportError as e: self.app.logger.warning(f" - Error importing {msg}: {e}") self.app.register_core_tasks() self.app.register_deferred_triggers()
[docs] def _check_atomic_services(self) -> None: """ Check and run atomic global services if this runner is authorized. Executes trigger processing and invocation recovery in a single atomic window to prevent conflicts across distributed runners. Handles any PynencError exceptions to prevent service failures from stopping the runner loop. """ current_time = time.time() check_interval_seconds = ( self.app.conf.atomic_service_check_interval_minutes * 60 ) if current_time - self._last_atomic_service_check_time < check_interval_seconds: return self._last_atomic_service_check_time = current_time start_time = None try: if not self.app.orchestrator.should_run_atomic_service(self.runner_context): return start_time = datetime.now(UTC) self.app.logger.info( f"runner:{self.runner_id} executing atomic global services" ) self.app.trigger.trigger_loop_iteration() except PynencError as e: self.app.logger.error( f"Error during atomic service execution: {e}", exc_info=True ) except Exception as e: self.app.logger.exception( f"Unexpected error during atomic service execution: {e}" ) raise finally: if start_time is not None: end_time = datetime.now(UTC) self.app.orchestrator.record_atomic_service_execution( self.runner_context.runner_id, start_time, end_time )
[docs] def run(self) -> None: """ Starts the runner, initiating its main loop. Sets the current runner in the context so that any invocations registered from within running tasks will use this runner's context instead of falling back to ExternalRunner. """ # Set this runner as the current runner in the context FIRST # This ensures all logs during startup show the correct runner context context.set_current_runner(self.app.app_id, self) # Calls for initial setup on Runner implementations self.on_start() try: while self.running: # We must know which worker are alive and running invocations # Otherwise, the running invocation recovery service may # incorrectly assume that invocations owned by dead workers # need to be recovered, when in fact their workers are still alive. self._report_child_runner_heartbeats() # Atomic services runs in different runners at different times # We use to minimize conflicts with triggering and recovery services self._check_atomic_services() # Main runner loop iteration self.runner_loop_iteration() time.sleep(self.conf.runner_loop_sleep_time_sec) except KeyboardInterrupt: self.app.logger.warning( f"KeyboardInterrupt received. Stopping runner:{self.runner_id}..." ) except Exception as e: self.app.logger.exception(f"Exception in runner:{self.runner_id} loop: {e}") raise e finally: self.on_stop() context.clear_current_runner(self.app.app_id)
[docs] def _register_new_child_runner_context(self, child_context: RunnerContext) -> None: """ Register a new child runner context and ensures that we have an initial heartbeat. :param RunnerContext child_context: The context of the child runner to register """ self.app.state_backend.store_runner_context(child_context) self.app.orchestrator.register_runner_heartbeats([child_context.runner_id])
[docs] def _report_child_runner_heartbeats(self) -> None: """ Report heartbeats for active child runners to the orchestrator. This enables parent-based health reporting where the parent runner reports which child runner_ids are still alive based on OS-level process checks. This provides an additional liveness signal beyond the child's own heartbeat thread. """ if active_child_ids := self.get_active_child_runner_ids(): self.app.orchestrator.register_runner_heartbeats(active_child_ids)
[docs] class DummyRunner(BaseRunner): """ This runner is a placeholder for the Pynenc app. It will be used when the app is defined in any other Python environment than a Pynenc runner. Examples include: - A script that defines the app, decorates some tasks, routes them, and then finishes. Such a script does not plan to run anything itself but triggers tasks that will later run in actual runners. """ @property def cache(self) -> dict: if self._runner_cache is None: self._runner_cache = {} return self._runner_cache
[docs] @staticmethod def mem_compatible() -> bool: raise RunnerNotExecutableError( "This runner is a placeholder for the Pynenc app" )
[docs] def _on_start(self) -> None: raise RunnerNotExecutableError( "This runner is a placeholder for the Pynenc app" )
[docs] def _on_stop(self) -> None: raise RunnerNotExecutableError( "This runner is a placeholder for the Pynenc app" )
[docs] def _on_stop_runner_loop(self) -> None: raise RunnerNotExecutableError( "This runner is a placeholder for the Pynenc app" )
[docs] def runner_loop_iteration(self) -> None: raise RunnerNotExecutableError( "This runner is a placeholder for the Pynenc app" )
@property def max_parallel_slots(self) -> int: raise RunnerNotExecutableError( "This runner is a placeholder for the Pynenc app" )
[docs] def get_active_child_runner_ids(self) -> list[str]: """DummyRunner has no child runners.""" return []
[docs] def _waiting_for_results( self, running_invocation_id: "InvocationId", result_invocation_ids: list["InvocationId"], runner_args: dict[str, Any] | None = None, ) -> None: # Parameters are ids (string) to conform with BaseRunner interface. del running_invocation_id, result_invocation_ids, runner_args # invocation.result() was called from outside a runner; sleep briefly time.sleep(self.conf.invocation_wait_results_sleep_time_sec)
[docs] class ExternalRunner(DummyRunner): """ Represents an external/client context outside Pynenc runners. This runner captures hostname and PID information from the external process (e.g., user script, CLI) that registers invocations but doesn't execute them. It extends DummyRunner since it cannot execute tasks, but provides valid RunnerContext for tracking purposes. Uses hostname-pid as runner_id since external processes are not managed by Pynenc and we cannot guarantee UUID persistence across calls. """ def __init__( self, app: "Pynenc", runner_cache: dict | None = None, ) -> None: runner_context = self.get_default_external_runner_context() super().__init__(app, runner_cache, runner_context)
[docs] @classmethod def get_default_external_runner_context(cls) -> RunnerContext: """ Create a RunnerContext for the current external process. :return: RunnerContext with hostname-pid as runner_id """ return RunnerContext( runner_cls=cls.__name__, runner_id=f"ExternalRunner@{socket.gethostname()}-{os.getpid()}", )