pynenc.orchestrator.atomic_service

Atomic service coordination for distributed Pynenc runners.

This module provides functions to coordinate the execution of atomic global services (like trigger processing and invocation recovery) across multiple distributed runners. It ensures that only one runner executes these services at any given time, preventing race conditions and duplicate work.

Key components:

  • ActiveRunnerInfo: Runner metadata including heartbeat and execution history

  • Time slot calculation with execution history awareness

  • Execution time validation and warnings

Module Contents

Classes

ActiveRunnerInfo

Information about an active runner including heartbeat and execution tracking.

Functions

calculate_runner_position

Find the position of a runner in the ordered list of active runners.

get_max_execution_duration

Find the maximum execution duration among all runners with history.

validate_execution_time

Validate that execution fits within allocated time slot and warn if not.

calculate_time_slot

Calculate the time slot (start and end) for a runner’s service execution window.

is_runner_in_time_slot

Check if the current time falls within a runner’s execution time slot.

can_run_atomic_service

Determine if a runner should execute atomic global services now.

API

class pynenc.orchestrator.atomic_service.ActiveRunnerInfo[source]

Bases: typing.NamedTuple

Information about an active runner including heartbeat and execution tracking.

Parameters:
  • runner_id (str) – The unique identifier for the runner

  • creation_time (datetime) – When the runner was first registered

  • last_heartbeat (datetime) – When the last heartbeat was received

  • allow_to_run_atomic_service (bool) – Whether this runner can run atomic services

  • last_service_start (datetime | None) – When the last atomic service execution started

  • last_service_end (datetime | None) – When the last atomic service execution ended

runner_id: str

None

creation_time: datetime.datetime

None

last_heartbeat: datetime.datetime

None

allow_to_run_atomic_service: bool

False

last_service_start: datetime.datetime | None

None

last_service_end: datetime.datetime | None

None

get_last_execution_duration_seconds() float | None[source]

Calculate the duration of the last service execution.

Returns:

Duration in seconds, or None if no execution recorded

Return type:

float | None

pynenc.orchestrator.atomic_service.calculate_runner_position(runner_id: str, active_runners: list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo]) int | None[source]

Find the position of a runner in the ordered list of active runners.

Runners are ordered by creation time (oldest first), providing stable ordering for time slot assignment.

Parameters:
  • runner_id (str) – The runner ID to find

  • active_runners (list[ActiveRunnerInfo]) – Ordered list of active runners

Returns:

Zero-based position index, or None if runner not found

Return type:

int | None

pynenc.orchestrator.atomic_service.get_max_execution_duration(active_runners: list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo]) float[source]

Find the maximum execution duration among all runners with history.

This helps determine realistic time slot sizes based on actual execution behavior.

Parameters:

active_runners (list[ActiveRunnerInfo]) – List of active runners

Returns:

Maximum execution duration in seconds, or 0 if no history

Return type:

float

pynenc.orchestrator.atomic_service.validate_execution_time(execution_duration: float, allocated_slot_size: float, runner_id: str) None[source]

Validate that execution fits within allocated time slot and warn if not.

This detects configuration issues where service execution exceeds the time window, potentially causing overlapping executions and breaking atomicity guarantees.

Parameters:
  • execution_duration (float) – Actual execution time in seconds

  • allocated_slot_size (float) – Allocated time window in seconds

  • runner_id (str) – ID of the runner for logging context

pynenc.orchestrator.atomic_service.calculate_time_slot(runner_position: int, total_runners: int, service_interval_minutes: float, spread_margin_minutes: float, active_runners: list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo] | None = None) tuple[float, float][source]

Calculate the time slot (start and end) for a runner’s service execution window.

The service interval is divided equally among runners, with a spread margin subtracted from each slot to prevent overlapping executions. If execution history is available, validates that previous executions fit within allocated slots.

Parameters:
  • runner_position (int) – Zero-based position of the runner

  • total_runners (int) – Total number of active runners

  • service_interval_minutes (float) – Total service cycle duration in minutes

  • spread_margin_minutes (float) – Safety margin in minutes

  • active_runners (list[ActiveRunnerInfo] | None) – Optional runner history for validation

Returns:

Tuple of (start_time, end_time) in seconds within the cycle

Return type:

tuple[float, float]

pynenc.orchestrator.atomic_service.is_runner_in_time_slot(current_time: float, service_interval_minutes: float, start_time: float, end_time: float) bool[source]

Check if the current time falls within a runner’s execution time slot.

Uses modulo arithmetic to map current time into the repeating service cycle.

Parameters:
  • current_time (float) – Current Unix timestamp

  • service_interval_minutes (float) – Service cycle duration in minutes

  • start_time (float) – Slot start time in seconds (within cycle)

  • end_time (float) – Slot end time in seconds (within cycle)

Returns:

True if current time is within the slot

Return type:

bool

pynenc.orchestrator.atomic_service.can_run_atomic_service(runner_id: str, active_runners: list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo], current_time: float, service_interval_minutes: float, spread_margin_minutes: float) bool[source]

Determine if a runner should execute atomic global services now.

This function implements a distributed coordination algorithm that:

  1. Orders runners by creation time

  2. Divides the service interval into equal time slots

  3. Assigns each runner an exclusive time window

  4. Validates execution times against allocated slots

  5. Checks if current time falls within this runner’s window

Single-runner optimization: If only one runner exists, it always runs services.

Parameters:
  • runner_id (str) – The ID of the runner to check

  • active_runners (list[ActiveRunnerInfo]) – All currently active runners

  • current_time (float) – Current Unix timestamp

  • service_interval_minutes (float) – How often services should run

  • spread_margin_minutes (float) – Safety margin to prevent overlaps

Returns:

True if this runner should execute services now

Return type:

bool

Example: With 3 runners and a 6-minute interval: - Runner 0: executes in minutes 0-2 - Runner 1: executes in minutes 2-4 - Runner 2: executes in minutes 4-6