pynenc.orchestrator.atomic_service¶
Atomic service coordination for distributed Pynenc runners.
This module provides functions to coordinate the execution of atomic global services (like trigger processing and invocation recovery) across multiple distributed runners. It ensures that only one runner executes these services at any given time, preventing race conditions and duplicate work.
Key components:
ActiveRunnerInfo: Runner metadata including heartbeat and execution history
Time slot calculation with execution history awareness
Execution time validation and warnings
Module Contents¶
Classes¶
Information about an active runner including heartbeat and execution tracking. |
Functions¶
Find the position of a runner in the ordered list of active runners. |
|
Find the maximum execution duration among all runners with history. |
|
Validate that execution fits within allocated time slot and warn if not. |
|
Calculate the time slot (start and end) for a runner’s service execution window. |
|
Check if the current time falls within a runner’s execution time slot. |
|
Determine if a runner should execute atomic global services now. |
API¶
- class pynenc.orchestrator.atomic_service.ActiveRunnerInfo[source]¶
Bases:
typing.NamedTupleInformation about an active runner including heartbeat and execution tracking.
- Parameters:
runner_id (str) – The unique identifier for the runner
creation_time (datetime) – When the runner was first registered
last_heartbeat (datetime) – When the last heartbeat was received
allow_to_run_atomic_service (bool) – Whether this runner can run atomic services
last_service_start (datetime | None) – When the last atomic service execution started
last_service_end (datetime | None) – When the last atomic service execution ended
- creation_time: datetime.datetime¶
None
- last_heartbeat: datetime.datetime¶
None
- last_service_start: datetime.datetime | None¶
None
- last_service_end: datetime.datetime | None¶
None
- pynenc.orchestrator.atomic_service.calculate_runner_position(runner_id: str, active_runners: list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo]) int | None[source]¶
Find the position of a runner in the ordered list of active runners.
Runners are ordered by creation time (oldest first), providing stable ordering for time slot assignment.
- Parameters:
runner_id (str) – The runner ID to find
active_runners (list[ActiveRunnerInfo]) – Ordered list of active runners
- Returns:
Zero-based position index, or None if runner not found
- Return type:
int | None
- pynenc.orchestrator.atomic_service.get_max_execution_duration(active_runners: list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo]) float[source]¶
Find the maximum execution duration among all runners with history.
This helps determine realistic time slot sizes based on actual execution behavior.
- Parameters:
active_runners (list[ActiveRunnerInfo]) – List of active runners
- Returns:
Maximum execution duration in seconds, or 0 if no history
- Return type:
- pynenc.orchestrator.atomic_service.validate_execution_time(execution_duration: float, allocated_slot_size: float, runner_id: str) None[source]¶
Validate that execution fits within allocated time slot and warn if not.
This detects configuration issues where service execution exceeds the time window, potentially causing overlapping executions and breaking atomicity guarantees.
- pynenc.orchestrator.atomic_service.calculate_time_slot(runner_position: int, total_runners: int, service_interval_minutes: float, spread_margin_minutes: float, active_runners: list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo] | None = None) tuple[float, float][source]¶
Calculate the time slot (start and end) for a runner’s service execution window.
The service interval is divided equally among runners, with a spread margin subtracted from each slot to prevent overlapping executions. If execution history is available, validates that previous executions fit within allocated slots.
- Parameters:
runner_position (int) – Zero-based position of the runner
total_runners (int) – Total number of active runners
service_interval_minutes (float) – Total service cycle duration in minutes
spread_margin_minutes (float) – Safety margin in minutes
active_runners (list[ActiveRunnerInfo] | None) – Optional runner history for validation
- Returns:
Tuple of (start_time, end_time) in seconds within the cycle
- Return type:
- pynenc.orchestrator.atomic_service.is_runner_in_time_slot(current_time: float, service_interval_minutes: float, start_time: float, end_time: float) bool[source]¶
Check if the current time falls within a runner’s execution time slot.
Uses modulo arithmetic to map current time into the repeating service cycle.
- Parameters:
- Returns:
True if current time is within the slot
- Return type:
- pynenc.orchestrator.atomic_service.can_run_atomic_service(runner_id: str, active_runners: list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo], current_time: float, service_interval_minutes: float, spread_margin_minutes: float) bool[source]¶
Determine if a runner should execute atomic global services now.
This function implements a distributed coordination algorithm that:
Orders runners by creation time
Divides the service interval into equal time slots
Assigns each runner an exclusive time window
Validates execution times against allocated slots
Checks if current time falls within this runner’s window
Single-runner optimization: If only one runner exists, it always runs services.
- Parameters:
- Returns:
True if this runner should execute services now
- Return type:
Example: With 3 runners and a 6-minute interval: - Runner 0: executes in minutes 0-2 - Runner 1: executes in minutes 2-4 - Runner 2: executes in minutes 4-6