pynenc.runner.multi_thread_runner

Module Contents

Classes

ProcessState

ProcessStatus

MultiThreadRunner

MultiThreadRunner spawns separate processes, each running a ThreadRunner. It scales processes based on pending invocations and terminates those that remain idle.

Functions

thread_runner_process_main

Entry point for ThreadRunner worker processes spawned by MultiThreadRunner.

API

class pynenc.runner.multi_thread_runner.ProcessState(*args, **kwds)[source]

Bases: enum.Enum

ACTIVE

‘active’

IDLE

‘idle’

class pynenc.runner.multi_thread_runner.ProcessStatus[source]

Bases: typing.NamedTuple

last_update: float

None

active_count: int

None

state: pynenc.runner.multi_thread_runner.ProcessState

None

is_idle(now: float, idle_timeout: float) bool[source]

Return True if the process is idle and has been idle longer than idle_timeout.

pynenc.runner.multi_thread_runner.thread_runner_process_main(app: pynenc.app.Pynenc, *, parent_ctx_json: str, child_runner_id: str, runner_cache: dict, shared_status: dict[str, pynenc.runner.multi_thread_runner.ProcessStatus]) None[source]

Entry point for ThreadRunner worker processes spawned by MultiThreadRunner.

The parent pre-generates the child_runner_id before spawning, enabling parent-based health reporting. The parent reports heartbeats for alive children via its main loop.

class pynenc.runner.multi_thread_runner.MultiThreadRunner(app: pynenc.app.Pynenc, runner_cache: dict | None = None, runner_context: pynenc.runner.runner_context.RunnerContext | None = None)[source]

Bases: pynenc.runner.base_runner.BaseRunner

MultiThreadRunner spawns separate processes, each running a ThreadRunner. It scales processes based on pending invocations and terminates those that remain idle.

Initialization

WAITING_FOR_RESULTS_WARNING

‘waiting_for_results called on MultiThreadRunner from within a task. This should be handled by the Th…’

child_runner_ids: dict[str, multiprocessing.Process]

None

manager: multiprocessing.Manager

None

shared_status: dict[str, pynenc.runner.multi_thread_runner.ProcessStatus]

None

max_processes: int

None

conf() pynenc.conf.config_runner.ConfigMultiThreadRunner
static mem_compatible() bool[source]

Indicates if the runner is compatible with in-memory components.

Returns:

False, as each thread runs in a separate process with independent memory

property max_parallel_slots: int

The maximum number of parallel tasks that the runner can handle.

Returns:

int representing the maximum number of parallel tasks

get_active_child_runner_ids() list[str][source]

Return runner_ids of child processes that are still alive.

_log_shutdown(signum: int | None) None[source]
_on_start() None[source]

Initialize multiprocessing infrastructure for spawning worker processes.

Validates that multiprocessing is being used safely before creating the Manager and spawning initial processes.

_spawn_thread_runner_process() None[source]

Spawn a new ThreadRunner worker process with pre-generated runner_id.

_on_stop() None[source]

Stop all worker processes and shutdown the manager.

_safe_remove_shared_state(key: str) None[source]

Safely remove a process’s shared state, handling manager shutdown cases.

Parameters:

key (str) – The process key to remove from shared state

_on_stop_runner_loop() None[source]

Internal method called after receiving a signal to stop the runner loop.

_cleanup_dead_processes() None[source]

Remove processes that are no longer alive from tracking dictionaries.

_scale_up_processes() None[source]

Spawn new processes based on enforce_max_processes setting and pending tasks.

_terminate_idle_processes() None[source]

Terminate processes that are idle longer than the configured timeout.

runner_loop_iteration() None[source]

Execute one iteration of the runner loop.

_waiting_for_results(running_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], runner_args: dict[str, Any] | None = None) None[source]

Handle waiting for results when called outside a process context.

This method warns if called directly on MultiThreadRunner, as result waiting should occur within a ThreadRunner process using the context-set runner.

Parameters:
  • running_invocation_id (InvocationId) – ID of the invocation waiting for results

  • result_invocation_ids (list[InvocationId]) – IDs of invocations being awaited

  • runner_args (dict[str, Any] | None) – Additional runner-specific arguments