pynenc.runner.process_runner#

Module Contents#

Classes#

ProcessRunner

ProcessRunner is a concrete implementation of BaseRunner that executes tasks in separate processes.

API#

class pynenc.runner.process_runner.ProcessRunner(app: pynenc.app.Pynenc)[source]#

Bases: pynenc.runner.base_runner.BaseRunner

ProcessRunner is a concrete implementation of BaseRunner that executes tasks in separate processes.

It manages task invocations, handling their execution, monitoring, and lifecycle within individual processes. This runner is suitable for CPU-bound tasks and scenarios where task isolation is essential.

Initialization

wait_invocation: dict[pynenc.invocation.dist_invocation.DistributedInvocation, set[pynenc.invocation.dist_invocation.DistributedInvocation]]#

None

processes: dict[pynenc.invocation.dist_invocation.DistributedInvocation, multiprocessing.Process]#

None

manager: multiprocessing.Manager#

None

max_processes: int#

None

static mem_compatible() bool[source]#

Indicates if the runner is compatible with in-memory components.

Returns:

False, as each task is executed in a separate process with independent memory.

property max_parallel_slots: int#

The maximum number of parallel tasks that the runner can handle.

Returns:

An integer representing the maximum number of parallel tasks, based on CPU count.

property runner_args: dict[str, Any]#

Provides arguments necessary for parent-subprocess communication in ProcessRunner.

Returns:

A dictionary containing the ‘wait_invocation’ Managed dictionary for subprocesses.

property waiting_processes: int#

Returns the number of processes that are currently waiting for other invocations to finish.

Returns:

An integer representing the number of waiting processes.

parse_args(args: dict[str, Any]) None[source]#

Parses the arguments provided to the runner.

Parameters:

args – A dictionary of arguments passed to the runner.

_on_start() None[source]#

Internal method called when the ProcessRunner starts. Initializes the process manager and the data structures for managing invocations.

_on_stop() None[source]#

Internal method called when the ProcessRunner stops. Terminates all running processes and updates their invocation statuses.

_on_stop_runner_loop() None[source]#

Internal method called after receiving a signal to stop the runner loop. Clears the wait_invocation dictionary.

property available_processes: int#

Returns the number of available process slots for new invocations.

Returns:

An integer representing available process slots.

runner_loop_iteration() None[source]#

Executes one iteration of the ProcessRunner loop. Handles the execution and monitoring of task invocations in separate processes.

waiting_for_results(running_invocation: Optional[pynenc.invocation.dist_invocation.DistributedInvocation], result_invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation], runner_args: Optional[dict[str, Any]] = None) None[source]#

Handles invocations that are waiting for results from other invocations. Pauses the running process and registers it to wait for the results of specified invocations.

Parameters:
  • running_invocation – The invocation that is waiting for results.

  • result_invocations – A list of invocations whose results are being awaited.

  • runner_args – Additional arguments required for the ProcessRunner.