pynenc.runner.base_runner¶
Module Contents¶
Classes¶
The BaseRunner class defines the interface for a runner that executes task invocations. |
|
This runner is a placeholder for the Pynenc app. It will be used when the app is defined in any other Python environment than a Pynenc runner. |
|
Represents an external/client context outside Pynenc runners. |
API¶
- class pynenc.runner.base_runner.BaseRunner(app: pynenc.app.Pynenc, runner_cache: dict | None = None, runner_context: pynenc.runner.runner_context.RunnerContext | None = None)[source]¶
Bases:
abc.ABCThe BaseRunner class defines the interface for a runner that executes task invocations.
It interacts with various components of the Pynenc system, like the broker and orchestrator, and is responsible for handling the execution and life cycle of task invocations.
The runner’s behavior can vary depending on the execution environment (e.g., subprocess, async, cloud function, multiprocessing).
It is designed to be subclassed for specific execution environments.
Initialization
- property logger: logging.Logger¶
- property runner_context: pynenc.runner.runner_context.RunnerContext¶
The RunnerContext associated with this runner.
- abstractmethod static mem_compatible() bool[source]¶
Indicates if the runner is compatible with in-memory components.
Important
In memory components can only be used for testing purposes in shared memory space.
- Returns:
True if compatible, False otherwise
- abstract property max_parallel_slots: int¶
The maximum number of parallel tasks that the runner can handle.
- Returns:
An integer representing the maximum number of parallel tasks
- abstractmethod get_active_child_runner_ids() list[str][source]¶
Returns the list of currently active child runner IDs.
This method enables parent-based health reporting for runners that spawn child processes or workers. The parent can report which child runner_ids are still alive based on OS-level process checks (e.g., Process.is_alive()).
The orchestrator uses this to register heartbeats for active children, avoiding false recovery of invocations owned by children that are still running.
Runners that spawn child processes must return the runner_ids of their alive children. Runners that don’t spawn children must return an empty list.
- Returns:
List of runner_ids for currently active child workers.
- wait_until_stopped(timeout: float = 2.0) bool[source]¶
Block until the runner’s run() loop has fully exited.
Safe to call even if run() was never started (returns immediately).
- Parameters:
timeout (float) – Maximum seconds to wait.
- Returns:
True if stopped within the timeout, False otherwise.
- abstractmethod runner_loop_iteration() None[source]¶
One iteration of the runner loop. Subclasses should implement this method to process invocations.
- abstractmethod _on_stop_runner_loop() None[source]¶
This method is called after the runner loop signal is received
- _kill_and_reroute(invocation_id: Any, runner_ctx: RunnerContext | None = None) None[source]¶
Mark an invocation as KILLED and reroute it for retry.
Silently ignores if the invocation already reached a final status (it completed before we killed it — nothing to reroute).
- Parameters:
invocation_id (Any) – Invocation to kill and reroute
runner_ctx (RunnerContext | None) – Runner context to use for ownership; defaults to self.runner_context. Pass the child’s context when the parent is acting on behalf of a child (e.g. ProcessRunner killing a worker).
- _log_shutdown(signum: int | None) None[source]¶
Log diagnostics when the runner receives a shutdown signal.
Override in subclasses to include active processes/threads/invocations. The default logs only the runner identity and system environment.
- stop_runner_loop(signum: int | None = None, frame: FrameType | None = None) None[source]¶
Stops the runner loop, typically in response to a signal.
Logs shutdown diagnostics (including system env, running workers, and invocation state) before stopping. Critical for debugging OOM events.
- Parameters:
signum (int | None) – Signal number
frame (FrameType | None) – Frame object at the time the signal was received
- abstractmethod _waiting_for_results(running_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], runner_args: dict[str, Any] | None = None) None[source]¶
Method called when an invocation is waiting for results from other invocations.
Note
This method is called from the result method of an invocation
The runner has the oportunity to define the waiting behaviour of the running invocation in this method Otherwise the running invocation will infinetely loop until the result invocation is ready
Note
The running invocation may be None, when the result was called from outside a runner (e.g. user environment) In that case will be handle by the DummyRunner (default in the pynenc app to handle this cases)
Subclasses can define the waiting behavior of the running invocation in this method.
- Parameters:
running_invocation_id (str) – The ID of the invocation that is waiting for results
result_invocation_ids (list[str]) – A list of IDs of the invocations whose results are being awaited
runner_args (dict[str, Any] | None) – Additional arguments passed to the runner, specific to the runner’s implementation
- waiting_for_results(running_invocation_id: InvocationId | None, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], runner_args: dict[str, Any] | None = None) None[source]¶
Handles invocations that are waiting for results from other invocations. Pauses the current thread and registers it to wait for the results of specified invocations.
- Parameters:
- async async_waiting_for_results(running_invocation_id: InvocationId | None, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], runner_args: dict[str, Any] | None = None) None[source]¶
- init_trigger_tasks_modules() None[source]¶
Initialize trigger task modules and register trigger tasks.
- _check_atomic_services() None[source]¶
Check and run atomic global services if this runner is authorized.
Executes trigger processing and invocation recovery in a single atomic window to prevent conflicts across distributed runners. Handles any PynencError exceptions to prevent service failures from stopping the runner loop.
- run() None[source]¶
Starts the runner, initiating its main loop.
Sets the current runner in the context so that any invocations registered from within running tasks will use this runner’s context instead of falling back to ExternalRunner.
- _register_new_child_runner_context(child_context: pynenc.runner.runner_context.RunnerContext) None[source]¶
Register a new child runner context and ensures that we have an initial heartbeat.
- Parameters:
child_context (RunnerContext) – The context of the child runner to register
- _report_child_runner_heartbeats() None[source]¶
Report heartbeats for active child runners to the orchestrator.
This enables parent-based health reporting where the parent runner reports which child runner_ids are still alive based on OS-level process checks. This provides an additional liveness signal beyond the child’s own heartbeat thread.
- class pynenc.runner.base_runner.DummyRunner(app: pynenc.app.Pynenc, runner_cache: dict | None = None, runner_context: pynenc.runner.runner_context.RunnerContext | None = None)[source]¶
Bases:
pynenc.runner.base_runner.BaseRunnerThis runner is a placeholder for the Pynenc app. It will be used when the app is defined in any other Python environment than a Pynenc runner.
Examples include:
A script that defines the app, decorates some tasks, routes them, and then finishes. Such a script does not plan to run anything itself but triggers tasks that will later run in actual runners.
Initialization
- class pynenc.runner.base_runner.ExternalRunner(app: pynenc.app.Pynenc, runner_cache: dict | None = None)[source]¶
Bases:
pynenc.runner.base_runner.DummyRunnerRepresents an external/client context outside Pynenc runners.
This runner captures hostname and PID information from the external process (e.g., user script, CLI) that registers invocations but doesn’t execute them. It extends DummyRunner since it cannot execute tasks, but provides valid RunnerContext for tracking purposes.
Uses hostname-pid as runner_id since external processes are not managed by Pynenc and we cannot guarantee UUID persistence across calls.
Initialization
- classmethod get_default_external_runner_context() pynenc.runner.runner_context.RunnerContext[source]¶
Create a RunnerContext for the current external process.
- Returns:
RunnerContext with hostname-pid as runner_id