import multiprocessing
import os
import signal
import time
from functools import cached_property
from multiprocessing import Manager, Process
from typing import TYPE_CHECKING, Any
import uuid
# Use 'spawn' method on macOS to avoid connection issues
if (
hasattr(multiprocessing, "get_start_method")
and multiprocessing.get_start_method(allow_none=True) != "spawn"
):
try:
multiprocessing.set_start_method("spawn", force=True)
except RuntimeError:
# Method already set and we're not in main process
pass
from pynenc import context
from pynenc.conf.config_runner import ConfigPersistentProcessRunner
from pynenc.runner.base_runner import BaseRunner
from pynenc.runner.runner_context import RunnerContext
from pynenc.runner.shutdown_diagnostics import log_runner_shutdown
from pynenc.util.multiprocessing_utils import warn_missing_main_guard
if TYPE_CHECKING:
from multiprocessing.synchronize import Event
from pynenc.app import Pynenc
from pynenc.identifiers.invocation_id import InvocationId
[docs]
def persistent_process_main(
app: "Pynenc",
*,
runner_cache: dict,
stop_event: "Event",
parent_runner_ctx_json: str,
child_runner_id: str,
) -> None:
"""
Main function for persistent process that executes invocations sequentially.
The child_runner_id is pre-generated by the parent process before spawning,
allowing the parent to track which runner_ids are still alive via OS-level
process checks. The parent reports heartbeats for alive children in its main loop.
:param Pynenc app: The Pynenc application instance.
:param dict runner_cache: Shared cache dictionary across all processes.
:param Event stop_event: Multiprocessing event to signal graceful shutdown.
:param str parent_runner_ctx_json: JSON serialized parent runner context.
:param str child_runner_id: Pre-generated runner_id for this child worker.
"""
app.runner._runner_cache = runner_cache
parent_runner_ctx = RunnerContext.from_json(parent_runner_ctx_json)
# Use pre-generated runner_id so parent can track this child
runner_ctx = parent_runner_ctx.new_child_context(
"PPRWorker", runner_id=child_runner_id
)
app.runner._register_new_child_runner_context(runner_ctx)
runner_id = runner_ctx.runner_id
context.set_runner_context(app.app_id, runner_ctx)
# Log after context is set so it shows the correct runner info
app.logger.info(f"Persistent process worker started with pid:{os.getpid()}")
def handle_terminate(signum: int, frame: Any) -> None:
inv_info = f" (active invocation:{invocation_id})" if invocation_id else ""
app.logger.warning(
f"worker:{runner_id} received signal:{signum}{inv_info}, stopping"
)
stop_event.set()
# Raising KeyboardInterrupt interrupts any blocking call in invocation.run()
# (e.g. time.sleep). The inner `except Exception` does not catch BaseException,
# so it propagates to `except KeyboardInterrupt` where the outer finally reroutes.
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, handle_terminate) # Handle SIGTERM gracefully
invocation_id: str | None = None
try:
while not stop_event.is_set():
invocations = list(app.orchestrator.get_invocations_to_run(1, runner_ctx))
if not invocations:
continue
invocation = invocations[0]
invocation_id = invocation.invocation_id
try:
invocation.run(runner_ctx)
except Exception:
app.logger.exception(f"Error executing invocation:{invocation_id}")
# Do NOT clear invocation_id here: the outer finally needs it to attempt
# rerouting if stop_event was set while run() was in progress. If the
# invocation already reached a final status the reroute attempt is a no-op.
except KeyboardInterrupt:
app.logger.info(f"worker:{runner_id} received KeyboardInterrupt, exiting")
except Exception as e:
app.logger.exception(f"worker:{runner_id} error: {e}")
finally:
if invocation_id:
app.logger.warning(
f"worker:{runner_id} shutting down with active invocation:{invocation_id}, rerouting"
)
app.runner._kill_and_reroute(invocation_id, runner_ctx=runner_ctx)
app.logger.info(f"worker:{runner_id} shutting down")
[docs]
class PersistentProcessRunner(BaseRunner):
"""
PersistentProcessRunner maintains a fixed number of processes that continuously run tasks.
This runner spawns child worker processes (PPRWorker) that execute invocations.
The parent pre-generates runner_ids for children before spawning, enabling
parent-based health reporting via OS-level process checks.
"""
child_runner_ids: dict[str, Process] # Maps child runner_id to Process
manager: Manager # type: ignore
num_processes: int
stop_event: "Event"
@cached_property
def conf(self) -> ConfigPersistentProcessRunner:
return ConfigPersistentProcessRunner(
config_values=self.app.config_values,
config_filepath=self.app.config_filepath,
)
[docs]
@staticmethod
def mem_compatible() -> bool:
"""Indicates if the runner supports in-memory components."""
return False
@property
def max_parallel_slots(self) -> int:
"""Returns the maximum number of concurrent processes."""
return self.num_processes
[docs]
def get_active_child_runner_ids(self) -> list[str]:
"""
Returns runner_ids of alive child workers for parent-based health reporting.
:return: List of runner_ids for child workers with alive processes.
"""
if not hasattr(self, "child_runner_ids"):
return []
return [
runner_id
for runner_id, proc in self.child_runner_ids.items()
if proc.is_alive()
]
[docs]
def _log_shutdown(self, signum: int | None) -> None:
log_runner_shutdown(
self.app.logger,
self.__class__.__name__,
self.runner_id,
signum,
processes={
rid: (proc, None)
for rid, proc in getattr(self, "child_runner_ids", {}).items()
},
)
[docs]
def _on_start(self) -> None:
"""Initializes the runner and spawns initial processes."""
self.num_processes = max(
self.conf.min_parallel_slots, self.conf.num_processes or os.cpu_count() or 1
)
self.logger.info(f"Creating {self.num_processes} processes")
warn_missing_main_guard()
self.child_runner_ids = {} # Track runner_id -> Process for health reporting
self._process_id_counter: int = 0
self.manager = Manager()
self.runner_cache = self._runner_cache or self.manager.dict() # type: ignore
self.stop_event = self.manager.Event() # type: ignore
for _ in range(self.num_processes):
self._spawn_persistent_process()
[docs]
def _spawn_persistent_process(self) -> str | None:
"""
Spawns a new persistent process and returns its runner_id.
Pre-generates the child runner_id before spawning so the parent can
track which runner_ids are still alive via OS-level process checks.
:return: The runner_id of the spawned child, or None if spawn failed.
"""
if not hasattr(self, "running") or not self.running:
raise RuntimeError("Trying to spawn new process after stopping loop")
# Pre-generate child runner_id so parent can track it
child_runner_id = str(uuid.uuid4())
args = {
"app": self.app,
"runner_cache": self.runner_cache,
"stop_event": self.stop_event,
"parent_runner_ctx_json": self.runner_context.to_json(),
"child_runner_id": child_runner_id,
}
p = Process(target=persistent_process_main, kwargs=args, daemon=True)
try:
p.start()
if p.pid is None:
self.logger.error("Failed to start process: pid:None")
return None
self.child_runner_ids[child_runner_id] = p
self.logger.info(
f"Spawned persistent process: pid:{p.pid}, worker:{child_runner_id}"
)
except Exception as e:
self.logger.error(
f"Failed to spawn process for worker:{child_runner_id}: {e}"
)
raise
return child_runner_id
[docs]
def _terminate_all_processes(self) -> None:
"""Terminates all running processes with graceful shutdown attempt."""
# Check if stop_event is initialized first
if hasattr(self, "stop_event"):
try:
self.stop_event.set() # Signal all processes to stop
except Exception as e:
self.logger.warning(f"Failed to set stop event: {e}")
# Continue with process termination regardless
for runner_id, process in list(self.child_runner_ids.items()):
try:
if process.is_alive():
process.terminate()
process.join(timeout=5)
if process.is_alive():
self.logger.warning(
f"worker:{runner_id} did not terminate, forcing kill"
)
process.kill()
self.child_runner_ids.pop(runner_id, None)
self.logger.debug(f"Terminated worker:{runner_id}")
except Exception as e:
self.logger.warning(f"Error terminating worker:{runner_id}: {e}")
[docs]
def _on_stop(self) -> None:
"""Cleans up all resources when runner stops."""
try:
self._terminate_all_processes()
# Check if manager is initialized
if hasattr(self, "manager"):
try:
self.manager.shutdown() # type: ignore
except Exception as e:
self.logger.warning(f"Failed to shutdown manager: {e}")
except Exception as e:
self.logger.error(f"Error during {self.__class__.__name__} stop: {e}")
[docs]
def _on_stop_runner_loop(self) -> None:
"""Handles immediate stop from signal."""
try:
self._terminate_all_processes()
except Exception as e:
self.logger.error(f"Error during {self.__class__.__name__} loop stop: {e}")
[docs]
def runner_loop_iteration(self) -> None:
"""Maintains the configured number of running processes."""
# Clean up dead processes from tracking dict
dead_runner_ids = [
rid for rid, proc in self.child_runner_ids.items() if not proc.is_alive()
]
for rid in dead_runner_ids:
self.logger.warning(f"Detected dead child worker:{rid}, cleaning up")
self.child_runner_ids.pop(rid, None)
current_count = len(self.child_runner_ids)
if current_count < self.num_processes:
processes_to_spawn = self.num_processes - current_count
self.logger.info(f"Spawning {processes_to_spawn} new processes")
for _ in range(processes_to_spawn):
self._spawn_persistent_process()
time.sleep(self.conf.runner_loop_sleep_time_sec)
[docs]
def _waiting_for_results(
self,
running_invocation_id: "InvocationId",
result_invocation_ids: list["InvocationId"],
runner_args: dict[str, Any] | None = None,
) -> None:
"""
In this simplified version, we don't pause/resume processes.
The invocation will just be marked as paused and the process will continue
with other invocations.
"""
# Use ids to conform with BaseRunner signature.
del running_invocation_id, result_invocation_ids, runner_args
time.sleep(self.conf.invocation_wait_results_sleep_time_sec)
# We cannot mark as PAUSED as the runner will not resume