import os
import signal
from multiprocessing import Manager, Process, cpu_count
from typing import Any, NamedTuple, TYPE_CHECKING
from pynenc.exceptions import InvocationStatusError, RunnerError
from pynenc.invocation import InvocationStatus
from pynenc.runner.base_runner import BaseRunner
from pynenc.runner.shutdown_diagnostics import log_runner_shutdown
from pynenc.util.multiprocessing_utils import warn_missing_main_guard
if TYPE_CHECKING:
from pynenc.app import Pynenc
from pynenc.invocation import DistributedInvocation
from pynenc.identifiers.invocation_id import InvocationId
from pynenc.runner.runner_context import RunnerContext
[docs]
def run_invocation(
app: "Pynenc",
invocation: "DistributedInvocation",
runner_ctx: "RunnerContext",
runner_args: dict,
) -> None:
"""Run invocation in a separate process (must be top-level for multiprocessing)."""
invocation.run(runner_ctx, runner_args=runner_args)
[docs]
class ClassifiedInvocations(NamedTuple):
"""
Categorized invocation IDs based on their status.
:param final: List of invocation IDs that have reached their final status
:param non_final: List of invocation IDs that are still in progress
"""
final: list["InvocationId"]
non_final: list["InvocationId"]
[docs]
class ChildProcessInfo(NamedTuple):
"""
Information about a child worker process.
:param process: The Process object
:param invocation_id: The invocation_id being executed by this process
"""
process: Process
invocation_id: "InvocationId"
[docs]
class ProcessRunner(BaseRunner):
"""
ProcessRunner is a concrete implementation of BaseRunner that executes tasks in separate processes.
It manages task invocations, handling their execution, monitoring, and lifecycle within individual processes.
This runner is suitable for CPU-bound tasks and scenarios where task isolation is essential.
"""
def __init__(
self,
app: "Pynenc",
runner_cache: dict | None = None,
runner_context: "RunnerContext | None" = None,
) -> None:
self.child_runner_ids: dict[str, ChildProcessInfo] = {}
self.wait_invocation: dict[InvocationId, set[InvocationId]] = {}
self.inv_id_to_runner_id: dict[InvocationId, str] = {}
self.max_processes: int = 0
self.manager: Manager | None = None # type: ignore
super().__init__(app, runner_cache, runner_context)
[docs]
@staticmethod
def mem_compatible() -> bool:
"""
Indicates if the runner is compatible with in-memory components.
:return: False, as each task is executed in a separate process with independent memory.
"""
return False
@property
def max_parallel_slots(self) -> int:
"""
The maximum number of parallel tasks that the runner can handle.
:return: An integer representing the maximum number of parallel tasks, based on CPU count.
"""
return max(self.conf.min_parallel_slots, self.max_processes)
[docs]
def get_active_child_runner_ids(self) -> list[str]:
"""Return runner_ids of child processes that are still alive."""
return [
runner_id
for runner_id, info in self.child_runner_ids.items()
if info.process.is_alive()
]
[docs]
def _log_shutdown(self, signum: int | None) -> None:
try:
waiting_inv_ids = [str(k) for k in (self.wait_invocation or {})]
except (OSError, EOFError):
waiting_inv_ids = ["<manager unavailable>"]
log_runner_shutdown(
self.app.logger,
self.__class__.__name__,
self.runner_id,
signum,
processes={
rid: (info.process, str(info.invocation_id))
for rid, info in self.child_runner_ids.items()
},
waiting_inv_ids=waiting_inv_ids,
)
@property
def runner_args(self) -> dict[str, Any]:
"""
Provides arguments necessary for parent-subprocess communication in ProcessRunner.
:return: A dictionary containing the 'wait_invocation' Managed dictionary for subprocesses.
"""
# this is necessary for parent-subprocess communication on ProcessRunner
# it passes the wait_invocation Managed dictinoary to the subprocesses
# so they can notify the main loop when waiting for other invocatinos
# the main loop will then pause the subprocesses
return {"wait_invocation": self.wait_invocation}
@property
def waiting_processes(self) -> int:
"""
Returns the number of processes that are currently waiting for other invocations to finish.
:return: An integer representing the number of waiting processes.
"""
try:
if not self.wait_invocation:
return 0
return len(set.union(*self.wait_invocation.values()))
except (OSError, EOFError):
return 0
[docs]
def parse_args(self, args: dict[str, Any]) -> None:
"""
Parses the arguments provided to the runner.
:param args: A dictionary of arguments passed to the runner.
"""
self.wait_invocation = args["wait_invocation"]
[docs]
def _on_start(self) -> None:
"""
Internal method called when the ProcessRunner starts.
Initializes the process manager and the data structures for managing invocations.
"""
self.logger.info("Starting ProcessRunner")
warn_missing_main_guard()
self.manager = Manager()
self.wait_invocation = self.manager.dict() # type: ignore
self.runner_cache = self._runner_cache or self.manager.dict() # type: ignore
self.child_runner_ids = {}
self.inv_id_to_runner_id = {}
self.max_processes = cpu_count()
[docs]
def _on_stop(self) -> None:
"""
Internal method called when the ProcessRunner stops.
Kills each alive child with SIGKILL, waits for it to die, then calls
_kill_and_reroute. This ordering avoids a race where the task finishes
after we set KILLED but before the process actually dies: by joining
first we know the process is gone, and _kill_and_reroute silently skips
invocations that already reached a final status.
"""
self.logger.info("Stopping ProcessRunner")
for runner_id, info in self.child_runner_ids.items():
if info.process.is_alive():
self.logger.warning(
f"Killing runner:{runner_id} pid:{info.process.pid} "
f"with invocation:{info.invocation_id}"
)
try:
info.process.kill()
except OSError:
self.logger.debug(f"worker:{runner_id} already exited before kill")
info.process.join()
# Reconstruct the child's RunnerContext so the ownership check passes:
# the invocation was set to RUNNING under the child's runner_id, so
# only a context carrying that same runner_id can transition it further.
child_ctx = self.runner_context.new_child_context(
"ProcessRunnerWorker", runner_id=runner_id
)
self._kill_and_reroute(info.invocation_id, runner_ctx=child_ctx)
try:
if self.manager is not None:
self.manager.shutdown() # type: ignore
except (OSError, EOFError):
self.logger.debug("Manager already dead during shutdown")
self.logger.info("ProcessRunner stopped")
[docs]
def _on_stop_runner_loop(self) -> None:
"""
Internal method called after receiving a signal to stop the runner loop.
Clears the wait_invocation dictionary.
"""
self.logger.info("Stopping ProcessRunner loop")
try:
if self.wait_invocation is not None:
self.wait_invocation.clear()
except (OSError, EOFError):
self.logger.debug("Manager proxy unavailable during stop_runner_loop")
self.wait_invocation = {}
self.logger.info("ProcessRunner loop stopped")
[docs]
def _reclaim_available_slots(self) -> int:
"""
Joins finished processes and returns the number of available process slots.
:return: An integer representing available process slots.
"""
for runner_id in list(self.child_runner_ids):
info = self.child_runner_ids[runner_id]
if not info.process.is_alive():
info.process.join()
del self.child_runner_ids[runner_id]
self.inv_id_to_runner_id.pop(info.invocation_id, None)
# discount waiting processes, they should do nothing
# until the blocking invocation is finished
# otherwise, running one worker with one process
# will be lock indefintely until the blocking invocation runs
return self.max_parallel_slots - len(self.child_runner_ids)
[docs]
def clasify_waiting_invocations(
self,
) -> ClassifiedInvocations:
"""Will classify the waiting invocation IDs into final and non finals"""
try:
waiting_invocation_ids = list(self.wait_invocation.keys())
except (OSError, EOFError):
self.logger.warning("Manager proxy unavailable, stopping runner")
self.running = False
return ClassifiedInvocations([], [])
if not waiting_invocation_ids:
return ClassifiedInvocations([], [])
final_invocation_ids = self.app.orchestrator.filter_final(
waiting_invocation_ids
)
non_final_invocation_ids = [
inv_id
for inv_id in waiting_invocation_ids
if inv_id not in final_invocation_ids
]
return ClassifiedInvocations(final_invocation_ids, non_final_invocation_ids)
[docs]
def _get_process_for_invocation(
self, invocation_id: "InvocationId"
) -> Process | None:
"""Get the Process object for a given invocation_id."""
if runner_id := self.inv_id_to_runner_id.get(invocation_id):
if info := self.child_runner_ids.get(runner_id):
return info.process
return None
[docs]
def _pause_waiting_process(
self, waiting_invocation_id: "InvocationId", blocked_by: "InvocationId"
) -> None:
"""Send SIGSTOP to the process running a waiting invocation."""
waiting_process = self._get_process_for_invocation(waiting_invocation_id)
if waiting_process and waiting_process.pid:
try:
os.kill(waiting_process.pid, signal.SIGSTOP)
except OSError:
self.logger.debug(
f"invocation:{waiting_invocation_id} process already exited before SIGSTOP"
)
return
self.logger.info(
f"invocation:{waiting_invocation_id} waiting for invocation:{blocked_by}, pausing pid:{waiting_process.pid}"
)
[docs]
def _resume_waiting_process(self, waiting_invocation_id: "InvocationId") -> None:
"""Send SIGCONT and set RESUMED status for a previously paused invocation."""
waiting_process = self._get_process_for_invocation(waiting_invocation_id)
if not (waiting_process and waiting_process.pid):
return
try:
os.kill(waiting_process.pid, signal.SIGCONT)
except OSError:
self.logger.debug(
f"invocation:{waiting_invocation_id} process already exited before SIGCONT"
)
return
self.logger.info(
f"resuming pid:{waiting_process.pid} of invocation:{waiting_invocation_id}"
)
try:
self.app.orchestrator.set_invocation_status(
waiting_invocation_id,
InvocationStatus.RESUMED,
self.runner_context,
)
except InvocationStatusError as ex:
self.logger.warning(
f"Could not set invocation:{waiting_invocation_id} to RESUMED status: {ex}"
)
[docs]
def handle_waiting_invocations(self) -> None:
"""Handle the waiting invocations."""
try:
classified = self.clasify_waiting_invocations()
for invocation_id in classified.non_final:
for waiting_invocation_id in self.wait_invocation.get(
invocation_id, []
):
self._pause_waiting_process(waiting_invocation_id, invocation_id)
to_resume_invocation_ids: set[InvocationId] = set()
for invocation_id in classified.final:
if waiting_invocation_ids := self.wait_invocation.get(
invocation_id, set()
):
to_resume_invocation_ids.update(waiting_invocation_ids)
self.wait_invocation[invocation_id] = set()
self.logger.info(
f"invocation:{invocation_id} finalized, resuming waiting invocations:{waiting_invocation_ids}"
)
for waiting_invocation_id in to_resume_invocation_ids:
self._resume_waiting_process(waiting_invocation_id)
except (OSError, EOFError):
self.logger.warning(
"Manager proxy unavailable in handle_waiting_invocations, stopping runner"
)
self.running = False
[docs]
def runner_loop_iteration(self) -> None:
"""
Executes one iteration of the ProcessRunner loop.
Handles the execution and monitoring of task invocations in separate processes.
Each process gets a reserved runner context and only starts if an invocation is available.
"""
self.logger.debug("starting runner loop iteration (dynamic process slots)")
for _ in range(self._reclaim_available_slots()):
# Reserve a unique runner context for this process
reserved_ctx = self.runner_context.new_child_context("ProcessRunnerWorker")
# Try to get an invocation for this reserved context
invocations = list(
self.app.orchestrator.get_invocations_to_run(1, reserved_ctx)
)
if not invocations:
break
invocation = invocations[0]
self._register_new_child_runner_context(reserved_ctx)
process = Process(
target=run_invocation,
args=(self.app, invocation, reserved_ctx, self.runner_args),
daemon=True,
)
process.start()
if process.pid:
self.child_runner_ids[reserved_ctx.runner_id] = ChildProcessInfo(
process=process, invocation_id=invocation.invocation_id
)
self.inv_id_to_runner_id[invocation.invocation_id] = (
reserved_ctx.runner_id
)
else:
# The recovery service should pick this up
self.logger.error(
f"Failed to start process for invocation:{invocation.invocation_id}"
)
self.handle_waiting_invocations()
[docs]
def _waiting_for_results(
self,
running_invocation_id: "InvocationId",
result_invocation_ids: list["InvocationId"],
runner_args: dict[str, Any] | None = None,
) -> None:
"""
Handles invocations that are waiting for results from other invocations.
Pauses the running process and registers it to wait for the results of specified invocations.
:param InvocationId running_invocation_id: The ID of the invocation that is waiting for results.
:param list[InvocationId] result_invocation_ids: A list of IDs of invocations whose results are being awaited.
:param dict[str, Any] | None runner_args: Additional arguments required for the ProcessRunner.
"""
if not result_invocation_ids:
return
if not runner_args:
raise RunnerError("runner_args should be defined for ProcessRunner")
try:
self.parse_args(runner_args)
self.app.orchestrator.set_invocation_status(
running_invocation_id,
InvocationStatus.PAUSED,
runner_ctx=self.runner_context,
)
for result_inv_id in result_invocation_ids:
current_waiters = set(self.wait_invocation.get(result_inv_id, set()))
current_waiters.add(running_invocation_id)
self.wait_invocation[result_inv_id] = current_waiters
self.logger.debug(
f"invocation:{running_invocation_id} is waiting for invocation:{result_inv_id} to finish"
)
except InvocationStatusError as ex:
self.logger.warning(
f"Not possible to change invocation:{running_invocation_id} status: {ex}"
)
# remove from any wait_invocation set
for result_inv_id in result_invocation_ids:
if running_invocation_id in self.wait_invocation.get(
result_inv_id, set()
):
self.wait_invocation[result_inv_id].remove(running_invocation_id)
return