Source code for pynenc.runner.thread_runner

import multiprocessing
import threading
import time
from functools import cached_property
from typing import TYPE_CHECKING, Any, NamedTuple

from pynenc.conf.config_runner import ConfigThreadRunner
from pynenc.invocation.dist_invocation import DistributedInvocation
from pynenc.runner.base_runner import BaseRunner
from pynenc.runner.runner_context import RunnerContext
from pynenc.runner.shutdown_diagnostics import log_runner_shutdown

if TYPE_CHECKING:
    from pynenc.app import Pynenc
    from pynenc.identifiers.invocation_id import InvocationId


[docs] class ThreadInfo(NamedTuple): thread: threading.Thread invocation: DistributedInvocation
[docs] class ThreadRunner(BaseRunner): """ ThreadRunner is a concrete implementation of BaseRunner that executes tasks in separate threads. It manages task invocations, handling their execution and lifecycle within individual threads. This runner is suitable for I/O-bound tasks and scenarios where shared memory between tasks is required. """ threads: dict[str, ThreadInfo] max_threads: int waiting_invocation_ids: set[str] def __init__( self, app: "Pynenc", runner_cache: dict | None = None, runner_context: RunnerContext | None = None, ) -> None: # Initialize ThreadRunner-specific attributes before calling super().__init__ # This ensures they exist even if run() is never called self.threads = {} self.waiting_invocation_ids = set() self.max_threads = 0 # Will be set properly in _on_start() super().__init__(app, runner_cache, runner_context) @cached_property def conf(self) -> ConfigThreadRunner: return ConfigThreadRunner( config_values=self.app.config_values, config_filepath=self.app.config_filepath, )
[docs] @staticmethod def mem_compatible() -> bool: """ Indicates if the runner is compatible with in-memory components. :return: True, as each task is executed in a separate thread with shared memory. """ return True
@property def max_parallel_slots(self) -> int: """ The maximum number of parallel tasks that the runner can handle. :return: An integer representing the maximum number of parallel tasks. """ return max( self.conf.min_parallel_slots, self.conf.min_threads, self.max_threads )
[docs] def get_active_child_runner_ids(self) -> list[str]: """ThreadRunner doesn't spawn child processes, so returns empty list.""" return []
[docs] def _log_shutdown(self, signum: int | None) -> None: log_runner_shutdown( self.app.logger, self.__class__.__name__, self.runner_id, signum, threads={ str(inv_id): (ti.thread, str(ti.invocation.invocation_id)) for inv_id, ti in self.threads.items() }, waiting_inv_ids=[str(w) for w in self.waiting_invocation_ids], )
[docs] def _on_start(self) -> None: """ Internal method called when the ThreadRunner starts. Initializes the data infrastructures for managing invocations and threads. """ self.threads = {} self.waiting_invocation_ids = set() self.max_threads = self.conf.max_threads or multiprocessing.cpu_count()
# Set unique extra_id based on thread ID to distinguish runner threads
[docs] def _on_stop(self) -> None: """ Internal method called when the ThreadRunner stops. Joins all running threads and updates their invocation statuses. For alive threads: reroute first while we still own the RUNNING status, then join. If we joined first the task would complete as SUCCESS and the RUNNING→KILLED transition would no longer be valid. For already-dead threads: join (no-op) then reroute, which silently ignores the final status. """ self.logger.info(f"Stopping ThreadRunner, joining {len(self.threads)} threads.") for thread_info in self.threads.values(): self.logger.warning( f"Joining thread for invocation:{thread_info.invocation.invocation_id} " f"(thread:{thread_info.thread.name}, alive:{thread_info.thread.is_alive()})" ) if thread_info.thread.is_alive(): self._kill_and_reroute(thread_info.invocation.invocation_id) thread_info.thread.join() else: thread_info.thread.join() self._kill_and_reroute(thread_info.invocation.invocation_id)
[docs] def _on_stop_runner_loop(self) -> None: """ Internal method called after receiving a signal to stop the runner loop. """ pass
@property def available_threads(self) -> int: """ Returns the number of available thread slots for new invocations. Joins finished threads so they no longer count against the limit. :return: An integer representing available thread slots. """ # Rebuild the threads dictionary by joining finished threads. alive_threads = {} for k, thread_info in self.threads.items(): if thread_info.thread.is_alive(): alive_threads[k] = thread_info else: thread_info.thread.join() self.waiting_invocation_ids.discard( thread_info.invocation.invocation_id ) self.threads = alive_threads # Only count threads not waiting running_threads = [ k for k in self.threads if k not in self.waiting_invocation_ids ] return self.max_parallel_slots - len(running_threads)
[docs] def runner_loop_iteration(self) -> None: """ Executes one iteration of the ThreadRunner loop. Handles the execution and monitoring of task invocations in separate threads. """ invocations = self.app.orchestrator.get_invocations_to_run( self.available_threads, self.runner_context ) for invocation in invocations: try: thread = threading.Thread( target=invocation.run, daemon=True, args=[self.runner_context], ) thread.start() self.threads[invocation.invocation_id] = ThreadInfo(thread, invocation) self.logger.debug( f"Running invocation:{invocation.invocation_id} on thread:{thread.name}" ) except RuntimeError as e: self.logger.error( f"Failed to start thread for invocation:{invocation.invocation_id}: {e}" ) self.app.orchestrator.reroute_invocations( {invocation.invocation_id}, self.runner_context ) self.logger.debug( f"Finished loop iteration, sleeping for {self.conf.runner_loop_sleep_time_sec}s" ) time.sleep(self.conf.runner_loop_sleep_time_sec)
[docs] def _waiting_for_results( self, running_invocation_id: "InvocationId", result_invocation_ids: list["InvocationId"], runner_args: dict[str, Any] | None = None, ) -> None: """ Handles invocations waiting for results by polling a local final cache instead of pausing threads. :param running_invocation_id: The invocation that is waiting for results. :param result_invocation_ids: A list of invocations whose results are being awaited. :param runner_args: Additional arguments required for the ThreadRunner. """ del runner_args self.waiting_invocation_ids.add(running_invocation_id)