pynenc.runner.process_runner¶
Module Contents¶
Classes¶
Categorized invocation IDs based on their status. |
|
Information about a child worker process. |
|
ProcessRunner is a concrete implementation of BaseRunner that executes tasks in separate processes. |
Functions¶
Run invocation in a separate process (must be top-level for multiprocessing). |
API¶
- pynenc.runner.process_runner.run_invocation(app: pynenc.app.Pynenc, invocation: pynenc.invocation.DistributedInvocation, runner_ctx: pynenc.runner.runner_context.RunnerContext, runner_args: dict) None[source]¶
Run invocation in a separate process (must be top-level for multiprocessing).
- class pynenc.runner.process_runner.ClassifiedInvocations[source]¶
Bases:
typing.NamedTupleCategorized invocation IDs based on their status.
- Parameters:
final – List of invocation IDs that have reached their final status
non_final – List of invocation IDs that are still in progress
- class pynenc.runner.process_runner.ChildProcessInfo[source]¶
Bases:
typing.NamedTupleInformation about a child worker process.
- Parameters:
process – The Process object
invocation_id – The invocation_id being executed by this process
- process: multiprocessing.Process¶
None
- invocation_id: pynenc.identifiers.invocation_id.InvocationId¶
None
- class pynenc.runner.process_runner.ProcessRunner(app: pynenc.app.Pynenc, runner_cache: dict | None = None, runner_context: RunnerContext | None = None)[source]¶
Bases:
pynenc.runner.base_runner.BaseRunnerProcessRunner is a concrete implementation of BaseRunner that executes tasks in separate processes.
It manages task invocations, handling their execution, monitoring, and lifecycle within individual processes. This runner is suitable for CPU-bound tasks and scenarios where task isolation is essential.
Initialization
- static mem_compatible() bool[source]¶
Indicates if the runner is compatible with in-memory components.
- Returns:
False, as each task is executed in a separate process with independent memory.
- property max_parallel_slots: int¶
The maximum number of parallel tasks that the runner can handle.
- Returns:
An integer representing the maximum number of parallel tasks, based on CPU count.
- get_active_child_runner_ids() list[str][source]¶
Return runner_ids of child processes that are still alive.
- property runner_args: dict[str, Any]¶
Provides arguments necessary for parent-subprocess communication in ProcessRunner.
- Returns:
A dictionary containing the ‘wait_invocation’ Managed dictionary for subprocesses.
- property waiting_processes: int¶
Returns the number of processes that are currently waiting for other invocations to finish.
- Returns:
An integer representing the number of waiting processes.
- parse_args(args: dict[str, Any]) None[source]¶
Parses the arguments provided to the runner.
- Parameters:
args – A dictionary of arguments passed to the runner.
- _on_start() None[source]¶
Internal method called when the ProcessRunner starts. Initializes the process manager and the data structures for managing invocations.
- _on_stop() None[source]¶
Internal method called when the ProcessRunner stops.
Kills each alive child with SIGKILL, waits for it to die, then calls _kill_and_reroute. This ordering avoids a race where the task finishes after we set KILLED but before the process actually dies: by joining first we know the process is gone, and _kill_and_reroute silently skips invocations that already reached a final status.
- _on_stop_runner_loop() None[source]¶
Internal method called after receiving a signal to stop the runner loop. Clears the wait_invocation dictionary.
- _reclaim_available_slots() int[source]¶
Joins finished processes and returns the number of available process slots.
- Returns:
An integer representing available process slots.
- clasify_waiting_invocations() pynenc.runner.process_runner.ClassifiedInvocations[source]¶
Will classify the waiting invocation IDs into final and non finals
- _get_process_for_invocation(invocation_id: pynenc.identifiers.invocation_id.InvocationId) multiprocessing.Process | None[source]¶
Get the Process object for a given invocation_id.
- _pause_waiting_process(waiting_invocation_id: pynenc.identifiers.invocation_id.InvocationId, blocked_by: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Send SIGSTOP to the process running a waiting invocation.
- _resume_waiting_process(waiting_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Send SIGCONT and set RESUMED status for a previously paused invocation.
- runner_loop_iteration() None[source]¶
Executes one iteration of the ProcessRunner loop. Handles the execution and monitoring of task invocations in separate processes. Each process gets a reserved runner context and only starts if an invocation is available.
- _waiting_for_results(running_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], runner_args: dict[str, Any] | None = None) None[source]¶
Handles invocations that are waiting for results from other invocations. Pauses the running process and registers it to wait for the results of specified invocations.
- Parameters: