pynenc.orchestrator.sqlite_orchestrator

SQLite-based orchestrator for cross-process testing.

This module provides a SQLite-based orchestrator implementation that enables true cross-process coordination for testing process runners. Unlike shared memory, SQLite provides ACID transactions and handles concurrent access automatically.

Module Contents

Classes

Tables

SQLiteBlockingControl

Blocking control for SQLiteOrchestrator using SQLite for cross-process invocation dependencies.

SQLiteOrchestrator

A SQLite-based implementation of the orchestrator for cross-process testing.

API

class pynenc.orchestrator.sqlite_orchestrator.Tables[source]
INVOCATIONS

‘orchestrator_invocations’

INVOCATION_ARGS

‘orchestrator_invocation_args’

BLOCKING_EDGES

‘orchestrator_blocking_edges’

RUNNER_HEARTBEATS

‘orchestrator_runner_heartbeats’

class pynenc.orchestrator.sqlite_orchestrator.SQLiteBlockingControl(app: pynenc.app.Pynenc, sqlite_db_path: str)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseBlockingControl

Blocking control for SQLiteOrchestrator using SQLite for cross-process invocation dependencies.

This class manages dependencies between task invocations, ensuring that invocations waiting for others are properly tracked and released. Implements blocking control using persistent SQLite tables.

Key components:

  • waiting_for: Tracks which invocations are waiting for results from others

  • waited_by: Tracks which invocations are being waited on by others

Initialization

_init_tables() None[source]

Initialize SQLite table for blocking control.

waiting_for_results(caller_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]

Notifies the system that an invocation is waiting for the results of other invocations.

release_waiters(waited_invocation_id: str) None[source]

Removes an invocation from the graph, along with any dependencies related to it.

get_blocking_invocations(max_num_invocations: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves invocations that are blocking others but are not themselves waiting for any results.

class pynenc.orchestrator.sqlite_orchestrator.SQLiteOrchestrator(app: pynenc.app.Pynenc)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseOrchestrator

A SQLite-based implementation of the orchestrator for cross-process testing.

This orchestrator uses SQLite for cross-process coordination and implements all required abstract methods from BaseOrchestrator. It’s designed specifically for testing process runners.

Warning

The SQLiteOrchestrator class is designed for testing purposes only and should not be used in production systems. It uses temporary SQLite files for state.

Initialization

_init_tables() None[source]

Initialize SQLite tables for orchestrator state.

conf() pynenc.conf.config_orchestrator.ConfigOrchestratorSQLite
property blocking_control: pynenc.orchestrator.base_orchestrator.BaseBlockingControl

Return blocking control.

_register_new_invocations(invocations: list[DistributedInvocation[Params, Result]], runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]

Register new invocations with status Register if they don’t exist yet.

get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Get existing invocation IDs for a task, optionally filtered by arguments and statuses.

Parameters:
  • task (Task[Params, Result]) – The task for which to retrieve invocation IDs.

  • key_serialized_arguments (dict[str, str] | None) – Serialized arguments to filter invocations.

  • statuses (list[InvocationStatus] | None) – The statuses to filter invocations.

Returns:

An iterator over matching invocation IDs.

get_task_invocation_ids(task_id: pynenc.task.TaskId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves all invocation ids for a given task id.

get_invocation_ids_paginated(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None, limit: int = 100, offset: int = 0) list[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves invocation IDs with pagination support.

Parameters:
  • task_id (TaskId | None) – Optional task ID to filter by.

  • statuses (list[InvocationStatus] | None) – Optional statuses to filter by.

  • limit (int) – Maximum number of results to return.

  • offset (int) – Number of results to skip.

Returns:

List of matching invocation IDs.

count_invocations(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) int[source]

Counts invocations matching the given filters.

Parameters:
  • task_id (str | None) – Optional task ID to filter by.

  • statuses (list[InvocationStatus] | None) – Optional statuses to filter by.

Returns:

The total count of matching invocations.

get_call_invocation_ids(call_id: pynenc.identifiers.call_id.CallId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves all invocation ids for a given call id.

_atomic_status_transition(invocation_id: pynenc.identifiers.invocation_id.InvocationId, status: pynenc.invocation.status.InvocationStatus, runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]

Atomically read, validate, and write invocation status.

Uses BEGIN IMMEDIATE to acquire a write lock before reading so that no two processes can concurrently observe the same “from” status, compute independent (valid) transitions, and both commit — which would produce duplicate history records and could allow forbidden status regressions (e.g. RUNNING → PENDING) to slip through.

index_arguments_for_concurrency_control(invocation: DistributedInvocation[Params, Result]) None[source]
set_up_invocation_auto_purge(invocation_id: str) None[source]

Set up invocation for auto-purging by setting the auto_purge_timestamp.

auto_purge() None[source]

Auto-purge old invocations based on auto_purge_timestamp.

get_invocation_status_record(invocation_id: str) pynenc.invocation.status.InvocationStatusRecord[source]

Get the current status of an invocation by ID, handling pending timeouts.

Parameters:

invocation_id (str) – The invocation ID

Returns:

The current status

increment_invocation_retries(invocation_id: str) None[source]

Increment the retry count for an invocation by ID.

Parameters:

invocation_id (str) – The invocation ID

get_invocation_retries(invocation_id: str) int[source]

Get the number of retries for an invocation by ID.

Parameters:

invocation_id (str) – The invocation ID

Returns:

The number of retries

filter_by_status(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], status_filter: frozenset[pynenc.invocation.status.InvocationStatus]) list[pynenc.identifiers.invocation_id.InvocationId][source]

Filter invocations by status by ID.

Parameters:
  • invocation_ids (list[“InvocationId”]) – The invocation IDs to filter

  • status_filter (frozenset[“InvocationStatus”] | None) – The statuses to filter by

Returns:

List of invocation IDs matching the status filter

register_runner_heartbeats(runner_ids: list[str], can_run_atomic_service: bool = False) None[source]

Register or update heartbeat timestamps for one or more runners.

_get_active_runners(timeout_seconds: float, can_run_atomic_service: bool | None = None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]

Retrieve all active runners with heartbeat information and atomic service eligibility.

record_atomic_service_execution(runner_id: str, start_time: datetime.datetime, end_time: datetime.datetime) None[source]

Record the latest atomic service execution window for a runner.

get_pending_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs stuck in PENDING status beyond the allowed time.

_get_running_invocations_for_recovery(timeout_seconds: float) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve RUNNING invocation IDs owned by inactive runners.

purge() None[source]

Clear all orchestrator state.