pynenc.orchestrator.sqlite_orchestrator¶
SQLite-based orchestrator for cross-process testing.
This module provides a SQLite-based orchestrator implementation that enables true cross-process coordination for testing process runners. Unlike shared memory, SQLite provides ACID transactions and handles concurrent access automatically.
Module Contents¶
Classes¶
Table names for orchestrator, scoped by app_id. |
|
Blocking control for SQLiteOrchestrator using SQLite for cross-process invocation dependencies. |
|
A SQLite-based implementation of the orchestrator for cross-process testing. |
API¶
- class pynenc.orchestrator.sqlite_orchestrator.Tables(app_id: str)[source]¶
Bases:
pynenc.util.sqlite_utils.TableNamesTable names for orchestrator, scoped by app_id.
Initialization
- class pynenc.orchestrator.sqlite_orchestrator.SQLiteBlockingControl(app: pynenc.app.Pynenc, sqlite_db_path: str, tables: pynenc.orchestrator.sqlite_orchestrator.Tables)[source]¶
Bases:
pynenc.orchestrator.base_orchestrator.BaseBlockingControlBlocking control for SQLiteOrchestrator using SQLite for cross-process invocation dependencies.
This class manages dependencies between task invocations, ensuring that invocations waiting for others are properly tracked and released. Implements blocking control using persistent SQLite tables.
Key components:
waiting_for: Tracks which invocations are waiting for results from others
waited_by: Tracks which invocations are being waited on by others
Initialization
- waiting_for_results(caller_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]¶
Notifies the system that an invocation is waiting for the results of other invocations.
- release_waiters(waited_invocation_id: str) None[source]¶
Removes an invocation from the graph, along with any dependencies related to it.
- get_blocking_invocations(max_num_invocations: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocations that are blocking others but are not themselves waiting for any results.
- class pynenc.orchestrator.sqlite_orchestrator.SQLiteOrchestrator(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.orchestrator.base_orchestrator.BaseOrchestratorA SQLite-based implementation of the orchestrator for cross-process testing.
This orchestrator uses SQLite for cross-process coordination and implements all required abstract methods from BaseOrchestrator. It’s designed specifically for testing process runners.
Warning
The
SQLiteOrchestratorclass is designed for testing purposes only and should not be used in production systems. It uses temporary SQLite files for state.Initialization
- property blocking_control: pynenc.orchestrator.base_orchestrator.BaseBlockingControl¶
Return blocking control.
- _register_new_invocations(invocations: list[DistributedInvocation[Params, Result]], runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]¶
Register new invocations with status Register if they don’t exist yet.
- get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Get existing invocation IDs for a task, optionally filtered by arguments and statuses.
- Parameters:
- Returns:
An iterator over matching invocation IDs.
- get_task_invocation_ids(task_id: pynenc.task.TaskId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves all invocation ids for a given task id.
- get_invocation_ids_paginated(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None, limit: int = 100, offset: int = 0) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs with pagination support.
- Parameters:
task_id (TaskId | None) – Optional task ID to filter by.
statuses (list[InvocationStatus] | None) – Optional statuses to filter by.
limit (int) – Maximum number of results to return.
offset (int) – Number of results to skip.
- Returns:
List of matching invocation IDs.
- count_invocations(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) int[source]¶
Counts invocations matching the given filters.
- Parameters:
task_id (str | None) – Optional task ID to filter by.
statuses (list[InvocationStatus] | None) – Optional statuses to filter by.
- Returns:
The total count of matching invocations.
- get_call_invocation_ids(call_id: pynenc.identifiers.call_id.CallId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves all invocation ids for a given call id.
- _atomic_status_transition(invocation_id: pynenc.identifiers.invocation_id.InvocationId, status: pynenc.invocation.status.InvocationStatus, runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]¶
Atomically read, validate, and write invocation status.
Uses
BEGIN IMMEDIATEto acquire a write lock before reading so that no two processes can concurrently observe the same “from” status, compute independent (valid) transitions, and both commit — which would produce duplicate history records and could allow forbidden status regressions (e.g. RUNNING → PENDING) to slip through.
- index_arguments_for_concurrency_control(invocation: DistributedInvocation[Params, Result]) None[source]¶
- set_up_invocation_auto_purge(invocation_id: str) None[source]¶
Set up invocation for auto-purging by setting the auto_purge_timestamp.
- get_invocation_status_record(invocation_id: str) pynenc.invocation.status.InvocationStatusRecord[source]¶
Get the current status of an invocation by ID, handling pending timeouts.
- Parameters:
invocation_id (str) – The invocation ID
- Returns:
The current status
- increment_invocation_retries(invocation_id: str) None[source]¶
Increment the retry count for an invocation by ID.
- Parameters:
invocation_id (str) – The invocation ID
- get_invocation_retries(invocation_id: str) int[source]¶
Get the number of retries for an invocation by ID.
- Parameters:
invocation_id (str) – The invocation ID
- Returns:
The number of retries
- filter_by_status(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], status_filter: frozenset[pynenc.invocation.status.InvocationStatus]) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Filter invocations by status by ID.
- register_runner_heartbeats(runner_ids: list[str], can_run_atomic_service: bool = False) None[source]¶
Register or update heartbeat timestamps for one or more runners.
- _get_active_runners(timeout_seconds: float, can_run_atomic_service: bool | None = None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]¶
Retrieve all active runners with heartbeat information and atomic service eligibility.
- record_atomic_service_execution(runner_id: str, start_time: datetime.datetime, end_time: datetime.datetime) None[source]¶
Record the latest atomic service execution window for a runner.
- get_pending_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs stuck in PENDING status beyond the allowed time.
- _get_running_invocations_for_recovery(timeout_seconds: float) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve RUNNING invocation IDs owned by inactive runners.