pynenc.runner.process_runner

Module Contents

Classes

ClassifiedInvocations

Categorized invocation IDs based on their status.

ChildProcessInfo

Information about a child worker process.

ProcessRunner

ProcessRunner is a concrete implementation of BaseRunner that executes tasks in separate processes.

Functions

run_invocation

Run invocation in a separate process (must be top-level for multiprocessing).

API

pynenc.runner.process_runner.run_invocation(app: pynenc.app.Pynenc, invocation: pynenc.invocation.DistributedInvocation, runner_ctx: pynenc.runner.runner_context.RunnerContext, runner_args: dict) None[source]

Run invocation in a separate process (must be top-level for multiprocessing).

class pynenc.runner.process_runner.ClassifiedInvocations[source]

Bases: typing.NamedTuple

Categorized invocation IDs based on their status.

Parameters:
  • final – List of invocation IDs that have reached their final status

  • non_final – List of invocation IDs that are still in progress

final: list[pynenc.identifiers.invocation_id.InvocationId]

None

non_final: list[pynenc.identifiers.invocation_id.InvocationId]

None

class pynenc.runner.process_runner.ChildProcessInfo[source]

Bases: typing.NamedTuple

Information about a child worker process.

Parameters:
  • process – The Process object

  • invocation_id – The invocation_id being executed by this process

process: multiprocessing.Process

None

invocation_id: pynenc.identifiers.invocation_id.InvocationId

None

class pynenc.runner.process_runner.ProcessRunner(app: pynenc.app.Pynenc, runner_cache: dict | None = None, runner_context: pynenc.runner.runner_context.RunnerContext | None = None)[source]

Bases: pynenc.runner.base_runner.BaseRunner

ProcessRunner is a concrete implementation of BaseRunner that executes tasks in separate processes.

It manages task invocations, handling their execution, monitoring, and lifecycle within individual processes. This runner is suitable for CPU-bound tasks and scenarios where task isolation is essential.

Initialization

wait_invocation: dict[pynenc.identifiers.invocation_id.InvocationId, set[pynenc.identifiers.invocation_id.InvocationId]]

None

child_runner_ids: dict[str, pynenc.runner.process_runner.ChildProcessInfo]

None

inv_id_to_runner_id: dict[pynenc.identifiers.invocation_id.InvocationId, str]

None

manager: multiprocessing.Manager

None

max_processes: int

None

static mem_compatible() bool[source]

Indicates if the runner is compatible with in-memory components.

Returns:

False, as each task is executed in a separate process with independent memory.

property max_parallel_slots: int

The maximum number of parallel tasks that the runner can handle.

Returns:

An integer representing the maximum number of parallel tasks, based on CPU count.

get_active_child_runner_ids() list[str][source]

Return runner_ids of child processes that are still alive.

_log_shutdown(signum: int | None) None[source]
property runner_args: dict[str, Any]

Provides arguments necessary for parent-subprocess communication in ProcessRunner.

Returns:

A dictionary containing the ‘wait_invocation’ Managed dictionary for subprocesses.

property waiting_processes: int

Returns the number of processes that are currently waiting for other invocations to finish.

Returns:

An integer representing the number of waiting processes.

parse_args(args: dict[str, Any]) None[source]

Parses the arguments provided to the runner.

Parameters:

args – A dictionary of arguments passed to the runner.

_on_start() None[source]

Internal method called when the ProcessRunner starts. Initializes the process manager and the data structures for managing invocations.

_on_stop() None[source]

Internal method called when the ProcessRunner stops.

Kills each alive child with SIGKILL, waits for it to die, then calls _kill_and_reroute. This ordering avoids a race where the task finishes after we set KILLED but before the process actually dies: by joining first we know the process is gone, and _kill_and_reroute silently skips invocations that already reached a final status.

_on_stop_runner_loop() None[source]

Internal method called after receiving a signal to stop the runner loop. Clears the wait_invocation dictionary.

property available_processes: int

Returns the number of available process slots for new invocations.

Returns:

An integer representing available process slots.

clasify_waiting_invocations() pynenc.runner.process_runner.ClassifiedInvocations[source]

Will classify the waiting invocation IDs into final and non finals

_get_process_for_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) multiprocessing.Process | None[source]

Get the Process object for a given invocation_id.

handle_waiting_invocations() None[source]

Handle the waiting invocations

runner_loop_iteration() None[source]

Executes one iteration of the ProcessRunner loop. Handles the execution and monitoring of task invocations in separate processes. Each process gets a reserved runner context and only starts if an invocation is available.

_waiting_for_results(running_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], runner_args: dict[str, Any] | None = None) None[source]

Handles invocations that are waiting for results from other invocations. Pauses the running process and registers it to wait for the results of specified invocations.

Parameters:
  • running_invocation_id (InvocationId) – The ID of the invocation that is waiting for results.

  • result_invocation_ids (list[InvocationId]) – A list of IDs of invocations whose results are being awaited.

  • runner_args (dict[str, Any] | None) – Additional arguments required for the ProcessRunner.