"""
Global Pynenc exception and warning classes.
"""
import json
from typing import TYPE_CHECKING, Any, Optional
from .util.subclasses import get_all_subclasses
if TYPE_CHECKING:
from .call import Call
from .invocation import BaseInvocation
[docs]
class PynencError(Exception):
"""Base class for all Pynenc related errors."""
[docs]
def _to_json_dict(self) -> dict[str, Any]:
""":return: a json serializable dictionary"""
return self.__dict__
[docs]
@classmethod
def _from_json_dict(cls, json_dict: dict[str, Any]) -> "PynencError":
""":return: a new error from the serialized json compatible dictionary"""
return cls(**json_dict)
[docs]
def to_json(self) -> str:
""":return: the serialized error"""
return json.dumps(self._to_json_dict())
[docs]
@classmethod
def from_json(cls, error_name: str, serialized: str) -> "PynencError":
""":return: the child class from a serialized error"""
for subcls in [cls] + get_all_subclasses(cls):
if subcls.__name__ == error_name:
return subcls._from_json_dict(json_dict=json.loads(serialized))
raise ValueError(f"Unknown error type: {error_name}")
[docs]
class RetryError(PynencError):
"""Error raised when a task should be retried."""
[docs]
class ConcurrencyRetryError(RetryError):
"""Error raised when a task should be retried due to concurrency control."""
[docs]
class PendingInvocationLockError(PynencError):
"""Error raised when two processes try to set the same invocation as pending concurrently"""
def __init__(self, invocation_id: str) -> None:
self.invocation_id = invocation_id
[docs]
class TaskError(PynencError):
"""Base class for all Task related errors."""
def __init__(self, task_id: str, message: Optional[str] = None) -> None:
self.task_id = task_id
self.message = message
[docs]
def __str__(self) -> str:
if self.message:
return f"{self.__class__.__name__}({self.task_id}): {self.message}"
else:
return f"{self.__class__.__name__}({self.task_id})"
[docs]
def _to_json_dict(self) -> dict[str, Any]:
""":return: the serialized error"""
return {"task_id": self.task_id, "message": self.message}
[docs]
@classmethod
def _from_json_dict(cls, json_dict: dict[str, Any]) -> "TaskError":
""":return: the serialized error"""
return cls(json_dict["task_id"], json_dict["message"])
[docs]
class InvalidTaskOptionsError(TaskError):
"""Error raised when the task options are invalid."""
[docs]
class TaskRoutingError(TaskError):
"""Error raised when a task will not be routed."""
[docs]
class InvocationConcurrencyWithDifferentArgumentsError(TaskRoutingError):
"""
Error raised when there is a task with different arguments
than the current task.
"""
def __init__(
self,
task_id: str,
existing_invocation_id: str,
new_call_id: str,
diff: str,
message: Optional[str] = None,
) -> None:
self.existing_invocation_id = existing_invocation_id
self.new_call_id = new_call_id
self.diff = diff
super().__init__(task_id, message)
[docs]
@classmethod
def from_call_mismatch(
cls,
existing_invocation: "BaseInvocation",
new_call: "Call",
message: Optional[str] = None,
) -> "InvocationConcurrencyWithDifferentArgumentsError":
return cls(
existing_invocation.task.task_id,
existing_invocation.invocation_id,
new_call.call_id,
cls.format_difference(existing_invocation.call, new_call),
message,
)
[docs]
def __str__(self) -> str:
if self.message:
return f"InvocationConcurrencyWithDifferentArgumentsError({self.task_id}) {self.message}\n{self.diff}"
return f"InvocationConcurrencyWithDifferentArgumentsError({self.task_id})\n{self.diff}"
[docs]
def _to_json_dict(self) -> dict[str, Any]:
""":return: the serialized error"""
return {
**super()._to_json_dict(),
"existing_invocation_id": self.existing_invocation_id,
"new_call_id": self.new_call_id,
"diff": self.diff,
}
[docs]
@classmethod
def _from_json_dict(
cls, json_dict: dict[str, str]
) -> "InvocationConcurrencyWithDifferentArgumentsError":
""":return: a new error from a serialized error"""
return cls(
json_dict["task_id"],
json_dict["existing_invocation_id"],
json_dict["new_call_id"],
json_dict["diff"],
json_dict["message"],
)
[docs]
class InvocationError(PynencError):
"""Base class for all Task related errors."""
def __init__(self, invocation_id: str, message: Optional[str] = None) -> None:
self.invocation_id = invocation_id
self.message = message
[docs]
def __str__(self) -> str:
if self.message:
return f"InvocationError({self.invocation_id}): {self.message}"
else:
return f"InvocationError({self.invocation_id})"
[docs]
def _to_json_dict(self) -> dict[str, Any]:
return {"invocation_id": self.invocation_id, "message": self.message}
[docs]
@classmethod
def _from_json_dict(cls, json_dict: dict[str, Any]) -> "InvocationError":
return cls(json_dict["invocation_id"], json_dict["message"])
[docs]
class StateBackendError(PynencError):
"""Error raised when a task will not be routed."""
[docs]
class InvocationNotFoundError(StateBackendError):
"""Error raised when the invocation is not present in the State Backend."""
def __init__(self, invocation_id: str, message: Optional[str] = None) -> None:
self.invocation_id = invocation_id
self.message = message
[docs]
def __str__(self) -> str:
if self.message:
return f"InvocationNotFoundError({self.invocation_id}): {self.message}"
else:
return f"InvocationNotFoundError({self.invocation_id})"
[docs]
def _to_json_dict(self) -> dict[str, Any]:
return {"invocation_id": self.invocation_id, "message": self.message}
[docs]
@classmethod
def _from_json_dict(cls, json_dict: dict[str, Any]) -> "InvocationNotFoundError":
return cls(json_dict["invocation_id"], json_dict["message"])
[docs]
class RunnerNotExecutableError(PynencError):
"""Raised when trying to execute a runner that is not meant to be executed."""
[docs]
class CycleDetectedError(PynencError):
"""Raised when a cycle is detected in the DependencyGraph"""
def __init__(self, call_ids: list[str], message: str) -> None:
self.call_ids = call_ids
self.message = message
super().__init__(message)
[docs]
@classmethod
def from_cycle(cls, cycle: list["Call"]) -> "CycleDetectedError":
call_ids = [call.call_id for call in cycle]
message = f"A cycle was detected: {cls._format_cycle(cycle)}"
return cls(call_ids, message)
[docs]
def _to_json_dict(self) -> dict[str, Any]:
return {"call_ids": self.call_ids, "message": self.message}
[docs]
@classmethod
def _from_json_dict(cls, json_dict: dict[str, Any]) -> "CycleDetectedError":
return cls(json_dict["call_ids"], json_dict["message"])
[docs]
class RunnerError(PynencError):
"""Base class for all Runner related errors."""
[docs]
class ConfigError(PynencError):
"""Base class for all the config related errors"""
[docs]
class ConfigMultiInheritanceError(ConfigError):
"""Error related with multiinheritance of config fields"""
[docs]
class AlreadyInitializedError(PynencError):
"""Error raised when trying to change the class of a component after it was initialized"""