"""
Invocation status management.
This module defines the status lifecycle of task invocations, including state transitions,
ownership rules, and the state machine that enforces them.
Key components:
- InvocationStatus: Enum of all possible invocation states
- InvocationStatusRecord: Immutable record combining status with ownership
- StatusDefinition: Declarative rules for status behavior
- State machine functions: Validation and transition logic
"""
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import StrEnum
from functools import cached_property
from typing import Final
from pynenc.exceptions import (
InvocationStatusTransitionError,
InvocationStatusOwnershipError,
)
[docs]
class InvocationStatus(StrEnum):
"""
An enumeration representing the status of a task invocation.
The PENDING status will expire after the time specified in
:attr:`~pynenc.conf.config_pynenc.ConfigPynenc.max_pending_seconds`.
```{note}
SUCCESS, FAILED, and CONCURRENCY_CONTROLLED_FINAL are final statuses that terminate the invocation lifecycle.
```
:cvar REGISTERED: The task call has been routed and is registered
:cvar CONCURRENCY_CONTROLLED: The task call is not allowed to run due to concurrency control
:cvar CONCURRENCY_CONTROLLED_FINAL: The task call was blocked by concurrency control and will not be retried
:cvar REROUTED: The task call has been re-routed and is registered
:cvar PENDING: The task call was picked by a runner but is not yet executed
:cvar PENDING_RECOVERY: The task call exceeded PENDING timeout and is being recovered
:cvar RUNNING: The task call is currently running
:cvar RUNNING_RECOVERY: The task call is being recovered because the owner runner is inactive
:cvar PAUSED: The task call execution is paused
:cvar RESUMED: The task call execution has been resumed
:cvar KILLED: The task call execution has been killed
:cvar SUCCESS: The task call finished without errors
:cvar FAILED: The task call finished with exceptions
:cvar RETRY: The task call finished with a retriable exception
"""
REGISTERED = "registered"
CONCURRENCY_CONTROLLED = "concurrency_controlled"
CONCURRENCY_CONTROLLED_FINAL = "concurrency_controlled_final"
REROUTED = "rerouted"
PENDING = "pending"
PENDING_RECOVERY = "pending_recovery"
RUNNING = "running"
RUNNING_RECOVERY = "running_recovery"
PAUSED = "paused"
RESUMED = "resumed"
KILLED = "killed"
SUCCESS = "success"
FAILED = "failed"
RETRY = "retry"
[docs]
def is_final(self) -> bool:
"""Check if the status terminates the invocation lifecycle."""
return self in _CONFIG.final_statuses
[docs]
def is_available_for_run(self) -> bool:
"""Check if the task can be picked up and run by any broker."""
return self in _CONFIG.available_for_run_statuses
[docs]
def can_transition_to(self, target: "InvocationStatus") -> bool:
"""Check if this status has a valid transition to the target status."""
return target in _CONFIG.get_definition(self).allowed_transitions
[docs]
@classmethod
def get_final_statuses(cls) -> frozenset["InvocationStatus"]:
"""Return all statuses that terminate the invocation lifecycle."""
return _CONFIG.final_statuses
[docs]
@classmethod
def get_available_for_run_statuses(cls) -> frozenset["InvocationStatus"]:
"""Return all statuses where invocations can be picked up by runners."""
return _CONFIG.available_for_run_statuses
[docs]
@dataclass(frozen=True)
class InvocationStatusRecord:
"""
Immutable record combining status with ownership information.
:ivar InvocationStatus status: The current invocation status
:ivar str | None runner_id: Runner ID that owns this invocation (None if no owner)
:ivar datetime timestamp: When this status was set
"""
status: InvocationStatus
runner_id: str | None = None
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
[docs]
def to_json(self) -> dict:
"""Serialize the status record to a JSON-compatible dictionary."""
return {
"status": self.status.value,
"runner_id": self.runner_id,
"timestamp": self.timestamp.isoformat(),
}
[docs]
@classmethod
def from_json(cls, json_dict: dict) -> "InvocationStatusRecord":
"""Deserialize a JSON-compatible dictionary into a status record."""
timestamp = datetime.fromisoformat(json_dict["timestamp"])
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=UTC)
return cls(
status=InvocationStatus(json_dict["status"]),
runner_id=json_dict.get("runner_id"),
timestamp=timestamp,
)
[docs]
@dataclass(frozen=True)
class StatusDefinition:
"""
Declarative definition of status behavior and ownership rules.
:ivar frozenset[InvocationStatus] allowed_transitions: Valid next statuses
:ivar bool is_final: Terminates invocation lifecycle
:ivar bool available_for_run: Can be picked up by runners
:ivar bool requires_ownership: Only owner can modify
:ivar bool acquires_ownership: Claims ownership on entry
:ivar bool releases_ownership: Releases ownership on entry
:ivar bool overrides_ownership: Bypasses ownership validation (for recovery scenarios)
"""
allowed_transitions: frozenset[InvocationStatus] = field(default_factory=frozenset)
is_final: bool = False
available_for_run: bool = False
requires_ownership: bool = False
acquires_ownership: bool = False
releases_ownership: bool = False
overrides_ownership: bool = False
[docs]
def __post_init__(self) -> None:
if self.acquires_ownership and self.releases_ownership:
raise ValueError("Status cannot both acquire and release ownership")
if self.is_final and self.allowed_transitions:
raise ValueError("Final statuses cannot have allowed transitions")
if self.overrides_ownership and self.requires_ownership:
raise ValueError("Status cannot both override and require ownership")
[docs]
@dataclass(frozen=True)
class StatusConfiguration:
"""
Complete configuration for invocation status behavior.
:ivar dict[InvocationStatus | None, StatusDefinition] definitions: Behavior rules per status
"""
definitions: dict[InvocationStatus | None, StatusDefinition]
[docs]
def __post_init__(self) -> None:
defined = {k for k in self.definitions if k is not None}
if missing := set(InvocationStatus) - defined:
raise ValueError(f"Missing definitions for: {missing}")
for status, definition in self.definitions.items():
for target in definition.allowed_transitions:
if target not in InvocationStatus:
raise ValueError(
f"{status} references undefined transition: {target}"
)
@cached_property
def final_statuses(self) -> frozenset[InvocationStatus]:
return frozenset(s for s, d in self.definitions.items() if s and d.is_final)
@cached_property
def available_for_run_statuses(self) -> frozenset[InvocationStatus]:
return frozenset(
s for s, d in self.definitions.items() if s and d.available_for_run
)
@cached_property
def ownership_required_statuses(self) -> frozenset[InvocationStatus]:
return frozenset(
s for s, d in self.definitions.items() if s and d.requires_ownership
)
@cached_property
def ownership_acquire_statuses(self) -> frozenset[InvocationStatus]:
return frozenset(
s for s, d in self.definitions.items() if s and d.acquires_ownership
)
@cached_property
def ownership_release_statuses(self) -> frozenset[InvocationStatus]:
return frozenset(
s for s, d in self.definitions.items() if s and d.releases_ownership
)
[docs]
def get_definition(self, status: InvocationStatus | None) -> StatusDefinition:
return self.definitions[status]
# ============================================================================
# Status Configuration
# ============================================================================
_CONFIG: Final[StatusConfiguration] = StatusConfiguration(
definitions={
None: StatusDefinition(
allowed_transitions=frozenset({InvocationStatus.REGISTERED}),
),
InvocationStatus.REGISTERED: StatusDefinition(
allowed_transitions=frozenset(
{
InvocationStatus.PENDING,
InvocationStatus.CONCURRENCY_CONTROLLED,
InvocationStatus.CONCURRENCY_CONTROLLED_FINAL,
}
),
available_for_run=True,
releases_ownership=True,
),
# The invocation is not allowed to run due to concurrency control
InvocationStatus.CONCURRENCY_CONTROLLED: StatusDefinition(
allowed_transitions=frozenset({InvocationStatus.REROUTED}),
available_for_run=False,
releases_ownership=True,
),
InvocationStatus.REROUTED: StatusDefinition(
allowed_transitions=frozenset(
{InvocationStatus.PENDING, InvocationStatus.CONCURRENCY_CONTROLLED}
),
available_for_run=True,
releases_ownership=True,
),
InvocationStatus.PENDING: StatusDefinition(
# PENDING_RECOVERY is for timeout recovery without ownership validation
allowed_transitions=frozenset(
{
InvocationStatus.RUNNING,
InvocationStatus.KILLED,
InvocationStatus.REROUTED,
InvocationStatus.PENDING_RECOVERY,
}
),
requires_ownership=True,
acquires_ownership=True,
),
# Recovery status for PENDING invocations that exceeded timeout
# Overrides ownership validation since original owner is unresponsive
InvocationStatus.PENDING_RECOVERY: StatusDefinition(
allowed_transitions=frozenset({InvocationStatus.REROUTED}),
releases_ownership=True,
overrides_ownership=True,
),
InvocationStatus.RUNNING: StatusDefinition(
allowed_transitions=frozenset(
{
InvocationStatus.PAUSED,
InvocationStatus.KILLED,
InvocationStatus.RETRY,
InvocationStatus.SUCCESS,
InvocationStatus.FAILED,
InvocationStatus.RUNNING_RECOVERY,
}
),
requires_ownership=True,
),
# Recovery status for RUNNING invocations owned by inactive runners
# Overrides ownership validation since original owner is unresponsive
InvocationStatus.RUNNING_RECOVERY: StatusDefinition(
allowed_transitions=frozenset({InvocationStatus.REROUTED}),
releases_ownership=True,
overrides_ownership=True,
),
InvocationStatus.PAUSED: StatusDefinition(
allowed_transitions=frozenset(
{InvocationStatus.RESUMED, InvocationStatus.KILLED}
),
requires_ownership=True,
),
InvocationStatus.RESUMED: StatusDefinition(
allowed_transitions=frozenset(
{
InvocationStatus.PAUSED,
InvocationStatus.KILLED,
InvocationStatus.RETRY,
InvocationStatus.SUCCESS,
InvocationStatus.FAILED,
}
),
requires_ownership=True,
),
InvocationStatus.KILLED: StatusDefinition(
allowed_transitions=frozenset({InvocationStatus.REROUTED}),
releases_ownership=True,
),
InvocationStatus.RETRY: StatusDefinition(
allowed_transitions=frozenset({InvocationStatus.PENDING}),
available_for_run=True,
releases_ownership=True,
),
InvocationStatus.SUCCESS: StatusDefinition(
is_final=True,
releases_ownership=True,
),
InvocationStatus.FAILED: StatusDefinition(
is_final=True,
releases_ownership=True,
),
# Final status for invocations blocked by concurrency control that should not retry
InvocationStatus.CONCURRENCY_CONTROLLED_FINAL: StatusDefinition(
is_final=True,
releases_ownership=True,
),
}
)
[docs]
def get_status_definition(status: InvocationStatus) -> StatusDefinition:
"""
Get the status definition for a given invocation status.
:param status: The invocation status to look up
:return: The status definition with rules and behavior
"""
return _CONFIG.get_definition(status)
# ============================================================================
# State Machine Functions
# ============================================================================
[docs]
def validate_transition(
from_status: InvocationStatus | None,
to_status: InvocationStatus,
) -> None:
"""
Validate state transition or raise exception.
:param InvocationStatus | None from_status: Current status (None for new invocations)
:param InvocationStatus to_status: Target status
:raises InvocationStatusTransitionError: If transition is invalid
"""
definition = _CONFIG.get_definition(from_status)
if to_status not in definition.allowed_transitions:
raise InvocationStatusTransitionError(
from_status=from_status,
to_status=to_status,
allowed_statuses=definition.allowed_transitions,
)
[docs]
def validate_ownership(
current_record: InvocationStatusRecord | None,
new_status: InvocationStatus,
runner_id: str | None,
) -> None:
"""
Validate ownership requirements for a transition.
:param InvocationStatusRecord | None current_record: Current status record
:param InvocationStatus new_status: Target status
:param str | None runner_id: Runner attempting the transition
:raises InvocationStatusOwnershipError: If ownership rules are violated
"""
if not current_record:
return
new_def = _CONFIG.get_definition(new_status)
# Allow transitions to statuses that override ownership validation
if new_def.overrides_ownership:
return
current_def = _CONFIG.get_definition(current_record.status)
msg: str | None = None
attempted_owner: str | None = runner_id
if current_def.requires_ownership and runner_id != current_record.runner_id:
msg = f"Status requires ownership by runner:'{current_record.runner_id}'"
elif new_def.acquires_ownership and not runner_id:
msg = f"Status {new_status} requires a runner_id to acquire ownership"
attempted_owner = None
if msg:
raise InvocationStatusOwnershipError(
from_status=current_record.status,
to_status=new_status,
current_owner=current_record.runner_id,
attempted_owner=attempted_owner,
reason=msg,
)
[docs]
def compute_new_owner(
current_record: InvocationStatusRecord | None,
new_status: InvocationStatus,
runner_id: str | None,
) -> str | None:
"""Compute new owner based on status transition."""
new_def = _CONFIG.get_definition(new_status)
if new_def.releases_ownership:
return None
if new_def.acquires_ownership:
return runner_id
return current_record.runner_id if current_record else None
[docs]
def status_record_transition(
current_record: InvocationStatusRecord | None,
new_status: InvocationStatus,
runner_id: str | None,
) -> InvocationStatusRecord:
"""
Execute a status change with safety checks.
:param InvocationStatusRecord | None current_record: Current state (None for new invocations)
:param InvocationStatus new_status: Desired new status
:param str | None runner_id: ID of runner making the change
:return: New validated status record
:raises InvocationStatusTransitionError: If transition is not allowed
:raises InvocationStatusOwnershipError: If ownership rules are violated
"""
from_status = current_record.status if current_record else None
validate_transition(from_status, new_status)
validate_ownership(current_record, new_status, runner_id)
new_owner = compute_new_owner(current_record, new_status, runner_id)
return InvocationStatusRecord(status=new_status, runner_id=new_owner)