Source code for pynenc.runner.persistent_process_runner

import multiprocessing
import os
import signal
import time
from functools import cached_property
from multiprocessing import Manager, Process
from typing import TYPE_CHECKING, Any, Optional

# Use 'spawn' method on macOS to avoid connection issues
if (
    hasattr(multiprocessing, "get_start_method")
    and multiprocessing.get_start_method(allow_none=True) != "spawn"
):
    try:
        multiprocessing.set_start_method("spawn", force=True)
    except RuntimeError:
        # Method already set and we're not in main process
        pass

from pynenc import context
from pynenc.conf.config_runner import ConfigPersistentProcessRunner
from pynenc.runner.base_runner import BaseRunner

if TYPE_CHECKING:
    from multiprocessing.synchronize import Event

    from pynenc.app import Pynenc
    from pynenc.invocation.dist_invocation import DistributedInvocation


[docs] def persistent_process_main( app: "Pynenc", *, process_key: str, runner_cache: dict, stop_event: "Event" ) -> None: """Main function for persistent process that executes invocations sequentially.""" app.logger.info(f"Persistent process {process_key} started with PID {os.getpid()}") app.runner._runner_cache = runner_cache app.runner.set_extra_id(process_key) context.set_current_runner(app.app_id, app.runner) def handle_terminate(signum: int, frame: Any) -> None: app.logger.info(f"Process {process_key} received SIGTERM, setting stop event") stop_event.set() signal.signal(signal.SIGTERM, handle_terminate) # Handle SIGTERM gracefully try: while not stop_event.is_set(): invocations = list( app.orchestrator.get_invocations_to_run(max_num_invocations=1) ) if not invocations: continue invocation = invocations[0] invocation_id = invocation.invocation_id app.logger.info( f"{process_key} starting invocation:{invocation.invocation_id}" ) try: invocation.run() except Exception: app.logger.exception(f"Error executing invocation {invocation_id}") except KeyboardInterrupt: app.logger.info(f"Process {process_key} received KeyboardInterrupt, exiting") except Exception as e: app.logger.exception(f"Process {process_key} error: {e}") finally: app.logger.info(f"Process {process_key} shutting down")
[docs] class PersistentProcessRunner(BaseRunner): """ PersistentProcessRunner maintains a fixed number of processes that continuously run tasks. """ processes: dict[str, Process] manager: Manager # type: ignore runner_cache: dict num_processes: int stop_event: "Event" @cached_property def conf(self) -> ConfigPersistentProcessRunner: return ConfigPersistentProcessRunner( config_values=self.app.config_values, config_filepath=self.app.config_filepath, ) @property def cache(self) -> dict: """Returns the shared cache for all processes.""" return self.runner_cache
[docs] @staticmethod def mem_compatible() -> bool: """Indicates if the runner supports in-memory components.""" return False
@property def max_parallel_slots(self) -> int: """Returns the maximum number of concurrent processes.""" return self.num_processes
[docs] def _generate_process_key(self) -> str: """Generates a unique process key using runner_id and an incrementing counter.""" self._process_id_counter += 1 return f"{self.runner_id}-worker-{self._process_id_counter}"
[docs] def _on_start(self) -> None: """Initializes the runner and spawns initial processes.""" self.num_processes = max( self.conf.min_parallel_slots, self.conf.num_processes or os.cpu_count() or 1 ) self.logger.info( f"Starting PersistentProcessRunner with {self.num_processes} processes" ) self.processes = {} self._process_id_counter: int = 0 self.manager = Manager() self.runner_cache = self._runner_cache or self.manager.dict() # type: ignore self.stop_event = self.manager.Event() # type: ignore for _ in range(self.num_processes): self._spawn_persistent_process()
[docs] def _spawn_persistent_process(self) -> str: """Spawns a new persistent process and returns its key.""" if not hasattr(self, "running") or not self.running: raise RuntimeError("Trying to spawn new process after stopping loop") process_key = self._generate_process_key() args = { "app": self.app, "process_key": process_key, "runner_cache": self.runner_cache, "stop_event": self.stop_event, } p = Process(target=persistent_process_main, kwargs=args, daemon=True) try: p.start() self.processes[process_key] = p self.logger.info( f"Spawned persistent process {process_key} with pid {p.pid}" ) except Exception as e: self.logger.error(f"Failed to spawn process {process_key}: {e}") raise return process_key
[docs] def _terminate_all_processes(self) -> None: """Terminates all running processes with graceful shutdown attempt.""" # Check if stop_event is initialized first if hasattr(self, "stop_event"): try: self.stop_event.set() # Signal all processes to stop except Exception as e: self.logger.warning(f"Failed to set stop event: {e}") # Continue with process termination regardless for key, process in list(self.processes.items()): try: if process.is_alive(): process.terminate() process.join(timeout=5) if process.is_alive(): self.logger.warning( f"Process {key} did not terminate, forcing kill" ) process.kill() self.processes.pop(key, None) self.logger.debug(f"Terminated process {key}") except Exception as e: self.logger.warning(f"Error terminating process {key}: {e}")
[docs] def _on_stop(self) -> None: """Cleans up all resources when runner stops.""" try: self.logger.info("Stopping PersistentProcessRunner") self._terminate_all_processes() # Check if manager is initialized if hasattr(self, "manager"): try: self.manager.shutdown() # type: ignore except Exception as e: self.logger.warning(f"Failed to shutdown manager: {e}") self.logger.info("PersistentProcessRunner stopped") except Exception as e: self.logger.error(f"Error during runner stop: {e}")
[docs] def _on_stop_runner_loop(self) -> None: """Handles immediate stop from signal.""" try: self.logger.info("Stopping PersistentProcessRunner loop due to signal") self._terminate_all_processes() except Exception as e: self.logger.error(f"Error during runner loop stop: {e}")
[docs] def runner_loop_iteration(self) -> None: """Maintains the configured number of running processes.""" dead_keys = [key for key, proc in self.processes.items() if not proc.is_alive()] for key in dead_keys: self.logger.warning(f"Detected dead process {key}, cleaning up") self.processes.pop(key, None) current_count = len(self.processes) if current_count < self.num_processes: processes_to_spawn = self.num_processes - current_count self.logger.info(f"Spawning {processes_to_spawn} new processes") for _ in range(processes_to_spawn): self._spawn_persistent_process() time.sleep(self.conf.runner_loop_sleep_time_sec)
[docs] def _waiting_for_results( self, running_invocation: "DistributedInvocation", result_invocations: list["DistributedInvocation"], runner_args: Optional[dict[str, Any]] = None, ) -> None: """ In this simplified version, we don't pause/resume processes. The invocation will just be marked as paused and the process will continue with other invocations. """ del running_invocation, result_invocations, runner_args time.sleep(self.conf.invocation_wait_results_sleep_time_sec)
# We cannot mark as PAUSED as the runner will not resume