Source code for pynenc.runner.thread_runner
import multiprocessing
import threading
import time
from functools import cached_property
from typing import TYPE_CHECKING, Any, NamedTuple
from pynenc.conf.config_runner import ConfigThreadRunner
from pynenc.invocation.dist_invocation import DistributedInvocation
from pynenc.runner.base_runner import BaseRunner
from pynenc.runner.runner_context import RunnerContext
from pynenc.runner.shutdown_diagnostics import log_runner_shutdown
if TYPE_CHECKING:
from pynenc.app import Pynenc
from pynenc.identifiers.invocation_id import InvocationId
[docs]
class ThreadInfo(NamedTuple):
thread: threading.Thread
invocation: DistributedInvocation
[docs]
class ThreadRunner(BaseRunner):
"""
ThreadRunner is a concrete implementation of BaseRunner that executes tasks in separate threads.
It manages task invocations, handling their execution and lifecycle within individual threads.
This runner is suitable for I/O-bound tasks and scenarios where shared memory between tasks is required.
"""
def __init__(
self,
app: "Pynenc",
runner_cache: dict | None = None,
runner_context: RunnerContext | None = None,
) -> None:
# Initialize ThreadRunner-specific attributes before calling super().__init__
# This ensures they exist even if run() is never called
self.threads: dict[str, ThreadInfo] = {}
self.waiting_invocation_ids: set[str] = set()
self.max_threads: int = 0 # Will be set properly in _on_start()
super().__init__(app, runner_cache, runner_context)
@cached_property
def conf(self) -> ConfigThreadRunner:
return ConfigThreadRunner(
config_values=self.app.config_values,
config_filepath=self.app.config_filepath,
)
[docs]
@staticmethod
def mem_compatible() -> bool:
"""
Indicates if the runner is compatible with in-memory components.
:return: True, as each task is executed in a separate thread with shared memory.
"""
return True
@property
def max_parallel_slots(self) -> int:
"""
The maximum number of parallel tasks that the runner can handle.
:return: An integer representing the maximum number of parallel tasks.
"""
return max(
self.conf.min_parallel_slots, self.conf.min_threads, self.max_threads
)
[docs]
def get_active_child_runner_ids(self) -> list[str]:
"""ThreadRunner doesn't spawn child processes, so returns empty list."""
return []
[docs]
def _log_shutdown(self, signum: int | None) -> None:
log_runner_shutdown(
self.app.logger,
self.__class__.__name__,
self.runner_id,
signum,
threads={
str(inv_id): (ti.thread, str(ti.invocation.invocation_id))
for inv_id, ti in self.threads.items()
},
waiting_inv_ids=[str(w) for w in self.waiting_invocation_ids],
)
[docs]
def _on_start(self) -> None:
"""
Internal method called when the ThreadRunner starts.
Initializes the data infrastructures for managing invocations and threads.
"""
self.threads = {}
self.waiting_invocation_ids = set()
self.max_threads = self.conf.max_threads or multiprocessing.cpu_count()
# Set unique extra_id based on thread ID to distinguish runner threads
[docs]
def _on_stop(self) -> None:
"""
Internal method called when the ThreadRunner stops.
Joins all running threads and updates their invocation statuses.
For alive threads: reroute first while we still own the RUNNING status,
then join. If we joined first the task would complete as SUCCESS and the
RUNNING→KILLED transition would no longer be valid.
For already-dead threads: join (no-op) then reroute, which silently
ignores the final status.
"""
self.logger.info(f"Stopping ThreadRunner, joining {len(self.threads)} threads.")
for thread_info in self.threads.values():
self.logger.warning(
f"Joining thread for invocation:{thread_info.invocation.invocation_id} "
f"(thread:{thread_info.thread.name}, alive:{thread_info.thread.is_alive()})"
)
if thread_info.thread.is_alive():
self._kill_and_reroute(thread_info.invocation.invocation_id)
thread_info.thread.join()
else:
thread_info.thread.join()
self._kill_and_reroute(thread_info.invocation.invocation_id)
[docs]
def _on_stop_runner_loop(self) -> None:
"""
Internal method called after receiving a signal to stop the runner loop.
"""
pass
[docs]
def _reclaim_available_slots(self) -> int:
"""
Joins finished threads and returns the number of available thread slots.
:return: An integer representing available thread slots.
"""
# Rebuild the threads dictionary by joining finished threads.
alive_threads = {}
for k, thread_info in self.threads.items():
if thread_info.thread.is_alive():
alive_threads[k] = thread_info
else:
thread_info.thread.join()
self.waiting_invocation_ids.discard(
thread_info.invocation.invocation_id
)
self.threads = alive_threads
# Only count threads not waiting
running_threads = [
k for k in self.threads if k not in self.waiting_invocation_ids
]
return self.max_parallel_slots - len(running_threads)
[docs]
def runner_loop_iteration(self) -> None:
"""
Executes one iteration of the ThreadRunner loop.
Handles the execution and monitoring of task invocations in separate threads.
"""
invocations = self.app.orchestrator.get_invocations_to_run(
self._reclaim_available_slots(), self.runner_context
)
for invocation in invocations:
try:
thread = threading.Thread(
target=invocation.run,
daemon=True,
args=[self.runner_context],
)
thread.start()
self.threads[invocation.invocation_id] = ThreadInfo(thread, invocation)
self.logger.debug(
f"Running invocation:{invocation.invocation_id} on thread:{thread.name}"
)
except RuntimeError as e:
self.logger.error(
f"Failed to start thread for invocation:{invocation.invocation_id}: {e}"
)
self.app.orchestrator.reroute_invocations(
{invocation.invocation_id}, self.runner_context
)
self.logger.debug(
f"Finished loop iteration, sleeping for {self.conf.runner_loop_sleep_time_sec}s"
)
time.sleep(self.conf.runner_loop_sleep_time_sec)
[docs]
def _waiting_for_results(
self,
running_invocation_id: "InvocationId",
result_invocation_ids: list["InvocationId"],
runner_args: dict[str, Any] | None = None,
) -> None:
"""
Handles invocations waiting for results by polling a local final cache instead of pausing threads.
:param running_invocation_id: The invocation that is waiting for results.
:param result_invocation_ids: A list of invocations whose results are being awaited.
:param runner_args: Additional arguments required for the ThreadRunner.
"""
del runner_args
self.waiting_invocation_ids.add(running_invocation_id)