pynenc.runner.persistent_process_runner

Module Contents

Classes

PersistentProcessRunner

PersistentProcessRunner maintains a fixed number of processes that continuously run tasks.

Functions

persistent_process_main

Main function for persistent process that executes invocations sequentially.

API

pynenc.runner.persistent_process_runner.persistent_process_main(app: pynenc.app.Pynenc, *, process_key: str, runner_cache: dict, stop_event: multiprocessing.synchronize.Event) None[source]

Main function for persistent process that executes invocations sequentially.

class pynenc.runner.persistent_process_runner.PersistentProcessRunner(app: pynenc.app.Pynenc, runner_cache: Optional[dict] = None, extra_id: Optional[str] = None)[source]

Bases: pynenc.runner.base_runner.BaseRunner

PersistentProcessRunner maintains a fixed number of processes that continuously run tasks.

Initialization

processes: dict[str, multiprocessing.Process]

None

manager: multiprocessing.Manager

None

runner_cache: dict

None

num_processes: int

None

stop_event: multiprocessing.synchronize.Event

None

conf() pynenc.conf.config_runner.ConfigPersistentProcessRunner
property cache: dict

Returns the shared cache for all processes.

static mem_compatible() bool[source]

Indicates if the runner supports in-memory components.

property max_parallel_slots: int

Returns the maximum number of concurrent processes.

_generate_process_key() str[source]

Generates a unique process key using runner_id and an incrementing counter.

_on_start() None[source]

Initializes the runner and spawns initial processes.

_spawn_persistent_process() str[source]

Spawns a new persistent process and returns its key.

_terminate_all_processes() None[source]

Terminates all running processes with graceful shutdown attempt.

_on_stop() None[source]

Cleans up all resources when runner stops.

_on_stop_runner_loop() None[source]

Handles immediate stop from signal.

runner_loop_iteration() None[source]

Maintains the configured number of running processes.

_waiting_for_results(running_invocation: pynenc.invocation.dist_invocation.DistributedInvocation, result_invocations: list[pynenc.invocation.dist_invocation.DistributedInvocation], runner_args: Optional[dict[str, Any]] = None) None[source]

In this simplified version, we don’t pause/resume processes. The invocation will just be marked as paused and the process will continue with other invocations.