Source code for pynenc.runner.persistent_process_runner

import multiprocessing
import os
import signal
import time
from functools import cached_property
from multiprocessing import Manager, Process
from typing import TYPE_CHECKING, Any
import uuid

from pynenc import context
from pynenc.conf.config_runner import ConfigPersistentProcessRunner
from pynenc.runner.base_runner import BaseRunner
from pynenc.runner.runner_context import RunnerContext
from pynenc.runner.shutdown_diagnostics import log_runner_shutdown
from pynenc.util.multiprocessing_utils import warn_missing_main_guard


if TYPE_CHECKING:
    from multiprocessing.synchronize import Event
    from pynenc.app import Pynenc
    from pynenc.identifiers.invocation_id import InvocationId


[docs] def persistent_process_main( app: "Pynenc", *, runner_cache: dict, stop_event: "Event", parent_runner_ctx_json: str, child_runner_id: str, ) -> None: """ Main function for persistent process that executes invocations sequentially. The child_runner_id is pre-generated by the parent process before spawning, allowing the parent to track which runner_ids are still alive via OS-level process checks. The parent reports heartbeats for alive children in its main loop. :param Pynenc app: The Pynenc application instance. :param dict runner_cache: Shared cache dictionary across all processes. :param Event stop_event: Multiprocessing event to signal graceful shutdown. :param str parent_runner_ctx_json: JSON serialized parent runner context. :param str child_runner_id: Pre-generated runner_id for this child worker. """ app.runner._runner_cache = runner_cache parent_runner_ctx = RunnerContext.from_json(parent_runner_ctx_json) # Use pre-generated runner_id so parent can track this child runner_ctx = parent_runner_ctx.new_child_context( "PPRWorker", runner_id=child_runner_id ) app.runner._register_new_child_runner_context(runner_ctx) runner_id = runner_ctx.runner_id context.set_runner_context(app.app_id, runner_ctx) # Log after context is set so it shows the correct runner info app.logger.info(f"Persistent process worker started with pid:{os.getpid()}") def handle_terminate(signum: int, frame: Any) -> None: inv_info = f" (active invocation:{invocation_id})" if invocation_id else "" app.logger.warning( f"worker:{runner_id} received signal:{signum}{inv_info}, stopping" ) stop_event.set() # Raising KeyboardInterrupt interrupts any blocking call in invocation.run() # (e.g. time.sleep). The inner `except Exception` does not catch BaseException, # so it propagates to `except KeyboardInterrupt` where the outer finally reroutes. raise KeyboardInterrupt signal.signal(signal.SIGTERM, handle_terminate) # Handle SIGTERM gracefully invocation_id: str | None = None try: while not stop_event.is_set(): invocations = list(app.orchestrator.get_invocations_to_run(1, runner_ctx)) if not invocations: continue invocation = invocations[0] invocation_id = invocation.invocation_id try: invocation.run(runner_ctx) except Exception: app.logger.exception(f"Error executing invocation:{invocation_id}") except KeyboardInterrupt: app.logger.info(f"worker:{runner_id} received KeyboardInterrupt, exiting") except Exception as e: app.logger.exception(f"worker:{runner_id} error: {e}") finally: if invocation_id: app.logger.warning( f"worker:{runner_id} shutting down with active invocation:{invocation_id}, rerouting" ) app.runner._kill_and_reroute(invocation_id, runner_ctx=runner_ctx) app.logger.info(f"worker:{runner_id} shutting down")
[docs] class PersistentProcessRunner(BaseRunner): """ PersistentProcessRunner maintains a fixed number of processes that continuously run tasks. This runner spawns child worker processes (PPRWorker) that execute invocations. The parent pre-generates runner_ids for children before spawning, enabling parent-based health reporting via OS-level process checks. """ def __init__( self, app: "Pynenc", runner_cache: dict | None = None, runner_context: RunnerContext | None = None, ) -> None: self.child_runner_ids: dict[str, Process] = {} self.num_processes: int = 0 self.manager: Manager | None = None # type: ignore self.stop_event: Event | None = None # type: ignore self._process_id_counter: int = 0 super().__init__(app, runner_cache, runner_context) @cached_property def conf(self) -> ConfigPersistentProcessRunner: return ConfigPersistentProcessRunner( config_values=self.app.config_values, config_filepath=self.app.config_filepath, )
[docs] @staticmethod def mem_compatible() -> bool: """Indicates if the runner supports in-memory components.""" return False
@property def max_parallel_slots(self) -> int: """Returns the maximum number of concurrent processes.""" return self.num_processes
[docs] def get_active_child_runner_ids(self) -> list[str]: """ Returns runner_ids of alive child workers for parent-based health reporting. :return: List of runner_ids for child workers with alive processes. """ return [ runner_id for runner_id, proc in self.child_runner_ids.items() if proc.is_alive() ]
[docs] def _log_shutdown(self, signum: int | None) -> None: log_runner_shutdown( self.app.logger, self.__class__.__name__, self.runner_id, signum, processes={ rid: (proc, None) for rid, proc in self.child_runner_ids.items() }, )
[docs] @staticmethod def _ensure_spawn_start_method() -> None: """Set multiprocessing start method to 'spawn' if not already set (needed on macOS).""" if ( hasattr(multiprocessing, "get_start_method") and multiprocessing.get_start_method(allow_none=True) != "spawn" ): try: multiprocessing.set_start_method("spawn", force=True) except RuntimeError: pass
[docs] def _on_start(self) -> None: """Initializes the runner and spawns initial processes.""" self._ensure_spawn_start_method() self.num_processes = max( self.conf.min_parallel_slots, self.conf.num_processes or os.cpu_count() or 1 ) self.logger.info(f"Creating {self.num_processes} processes") warn_missing_main_guard() self.child_runner_ids = {} # Track runner_id -> Process for health reporting self._process_id_counter = 0 self.manager = Manager() self.runner_cache = self._runner_cache or self.manager.dict() # type: ignore self.stop_event = self.manager.Event() # type: ignore for _ in range(self.num_processes): self._spawn_persistent_process()
[docs] def _spawn_persistent_process(self) -> str | None: """ Spawns a new persistent process and returns its runner_id. Pre-generates the child runner_id before spawning so the parent can track which runner_ids are still alive via OS-level process checks. :return: The runner_id of the spawned child, or None if spawn failed. """ if not self.running: raise RuntimeError("Trying to spawn new process after stopping loop") # Pre-generate child runner_id so parent can track it child_runner_id = str(uuid.uuid4()) args = { "app": self.app, "runner_cache": self.runner_cache, "stop_event": self.stop_event, "parent_runner_ctx_json": self.runner_context.to_json(), "child_runner_id": child_runner_id, } p = Process(target=persistent_process_main, kwargs=args, daemon=True) try: p.start() if p.pid is None: self.logger.error("Failed to start process: pid:None") return None self.child_runner_ids[child_runner_id] = p self.logger.info( f"Spawned persistent process: pid:{p.pid}, worker:{child_runner_id}" ) except Exception as e: self.logger.error( f"Failed to spawn process for worker:{child_runner_id}: {e}" ) raise return child_runner_id
[docs] def _terminate_all_processes(self) -> None: """Terminates all running processes with graceful shutdown attempt.""" if self.stop_event is not None: try: self.stop_event.set() # Signal all processes to stop except Exception as e: self.logger.warning(f"Failed to set stop event: {e}") # Continue with process termination regardless for runner_id, process in list(self.child_runner_ids.items()): try: if process.is_alive(): process.terminate() process.join(timeout=5) if process.is_alive(): self.logger.warning( f"worker:{runner_id} did not terminate, forcing kill" ) process.kill() self.child_runner_ids.pop(runner_id, None) self.logger.debug(f"Terminated worker:{runner_id}") except Exception as e: self.logger.warning(f"Error terminating worker:{runner_id}: {e}")
[docs] def _on_stop(self) -> None: """Cleans up all resources when runner stops.""" try: self._terminate_all_processes() if self.manager is not None: try: self.manager.shutdown() # type: ignore except Exception as e: self.logger.warning(f"Failed to shutdown manager: {e}") except Exception as e: self.logger.error(f"Error during {self.__class__.__name__} stop: {e}")
[docs] def _on_stop_runner_loop(self) -> None: """Handles immediate stop from signal.""" try: self._terminate_all_processes() except Exception as e: self.logger.error(f"Error during {self.__class__.__name__} loop stop: {e}")
[docs] def runner_loop_iteration(self) -> None: """Maintains the configured number of running processes.""" # Clean up dead processes from tracking dict dead_runner_ids = [ rid for rid, proc in self.child_runner_ids.items() if not proc.is_alive() ] for rid in dead_runner_ids: self.logger.warning(f"Detected dead child worker:{rid}, cleaning up") self.child_runner_ids.pop(rid, None) current_count = len(self.child_runner_ids) if current_count < self.num_processes: processes_to_spawn = self.num_processes - current_count self.logger.info(f"Spawning {processes_to_spawn} new processes") for _ in range(processes_to_spawn): self._spawn_persistent_process() time.sleep(self.conf.runner_loop_sleep_time_sec)
[docs] def _waiting_for_results( self, running_invocation_id: "InvocationId", result_invocation_ids: list["InvocationId"], runner_args: dict[str, Any] | None = None, ) -> None: """ In this simplified version, we don't pause/resume processes. The invocation will just be marked as paused and the process will continue with other invocations. """ # Use ids to conform with BaseRunner signature. del running_invocation_id, result_invocation_ids, runner_args time.sleep(self.conf.invocation_wait_results_sleep_time_sec)
# We cannot mark as PAUSED as the runner will not resume