Source code for pynenc.runner.shutdown_diagnostics

"""
Shutdown diagnostics utilities for runner termination events.

Centralises signal classification, system environment collection, and structured
logging so every runner produces consistent, actionable output on SIGTERM/OOM.
"""

import os
import platform
import signal
from enum import StrEnum, auto
from logging import Logger
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from multiprocessing import Process
    from threading import Thread


[docs] class ShutdownReason(StrEnum): NORMAL = auto() SIGTERM = auto() SIGINT = auto() # SIGKILL is the OOM signal in containerised environments (Kubernetes OOMKilled) OOM_KILLED = auto() UNKNOWN_SIGNAL = auto()
[docs] def classify_signal(signum: int | None) -> ShutdownReason: """Classify a signal number into a ShutdownReason.""" if signum is None: return ShutdownReason.NORMAL match signum: case signal.SIGTERM: return ShutdownReason.SIGTERM case signal.SIGINT: return ShutdownReason.SIGINT case signal.SIGKILL: return ShutdownReason.OOM_KILLED case _: return ShutdownReason.UNKNOWN_SIGNAL
[docs] def _system_info() -> dict[str, Any]: info: dict[str, Any] = { "platform": platform.platform(), "python": platform.python_version(), "cpu_count": os.cpu_count(), } try: info["load_avg"] = os.getloadavg() except (AttributeError, OSError): pass try: import resource # Unix only usage = resource.getrusage(resource.RUSAGE_SELF) # Linux reports in KB, macOS in bytes div = 1024 if platform.system() == "Linux" else 1024 * 1024 info["mem_rss_mb"] = round(usage.ru_maxrss / div, 1) except (ImportError, OSError): pass return info
[docs] def log_runner_shutdown( logger: Logger, runner_cls: str, runner_id: str, signum: int | None, *, processes: "dict[str, tuple[Process, str | None]] | None" = None, threads: "dict[str, tuple[Thread, str | None]] | None" = None, waiting_inv_ids: list[str] | None = None, ) -> None: """ Log structured diagnostics for a runner shutdown event. Collects system environment info automatically. Processes and threads are passed as dicts mapping an identifier to (object, invocation_id_or_None). :param Logger logger: Logger to write to. :param str runner_cls: Runner class name. :param str runner_id: Runner instance ID. :param int | None signum: Signal that triggered shutdown, if any. :param processes: Child processes keyed by runner_id. :param threads: Active threads keyed by an identifier. :param waiting_inv_ids: Invocations blocked waiting for results. """ reason = classify_signal(signum) is_oom = reason == ShutdownReason.OOM_KILLED log_fn = logger.critical if is_oom else logger.warning sys_data = _system_info() sys_parts = [ f"os:{sys_data['platform']}", f"py:{sys_data['python']}", f"cpus:{sys_data['cpu_count']}", ] if "load_avg" in sys_data: sys_parts.append(f"load:{sys_data['load_avg']}") if "mem_rss_mb" in sys_data: sys_parts.append(f"rss:{sys_data['mem_rss_mb']}MB") lines = [ "=" * 60, f"RUNNER SHUTDOWN {'[OOM DETECTED] ' if is_oom else ''}DIAGNOSTICS", f" runner:{runner_cls}({runner_id[:8]}) pid:{os.getpid()} signal:{signum} reason:{reason}", f" sys: {' | '.join(sys_parts)}", ] if processes: lines.append(f" processes ({len(processes)}):") for rid, (proc, inv_id) in processes.items(): state = "ALIVE" if proc.is_alive() else "DEAD" inv = f" invocation:{inv_id}" if inv_id else "" lines.append(f" {state} pid:{proc.pid} worker:{rid[:8]}{inv}") if threads: lines.append(f" threads ({len(threads)}):") for key, (thread, inv_id) in threads.items(): state = "ALIVE" if thread.is_alive() else "DEAD" inv = f" invocation:{inv_id}" if inv_id else "" lines.append(f" {state} thread:{thread.name} worker:{key[:8]}{inv}") if waiting_inv_ids: lines.append( f" waiting ({len(waiting_inv_ids)}): {', '.join(waiting_inv_ids)}" ) lines.append("=" * 60) log_fn("\n".join(lines)) if is_oom: logger.critical( "OOM KILLED: likely memory exhaustion. Check resource limits and task " "payload sizes. Affected invocations will be rerouted." )