from __future__ import annotations
import json
from collections.abc import AsyncGenerator, Iterator
from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING, Any
from pynenc import context, exceptions
from pynenc.arguments import Arguments
from pynenc.call import Call
from pynenc.invocation.base_invocation import BaseInvocation, BaseInvocationGroup
from pynenc.invocation.status import InvocationStatus
from pynenc.types import Params, Result
from pynenc.util.asyncio_helper import run_task_sync
if TYPE_CHECKING:
from ..app import Pynenc
# Create a context variable to store current invocation
[docs]
@dataclass(frozen=True, eq=False)
class DistributedInvocation(BaseInvocation[Params, Result]):
"""
Represents a distributed invocation of a task call in the system.
This class is a specific implementation of `BaseInvocation` for use in distributed environments. It extends the base invocation with additional features necessary for handling task executions in a distributed manner.
In the distributed system, `DistributedInvocation` handles the execution of a task across different nodes or processes. It includes mechanisms for tracking and managing the state of an invocation in a distributed context.
:param Call[Params, Result] call:
Inherits the `call` attribute from `BaseInvocation`, representing the specific task call that this invocation executes.
:param DistributedInvocation | None parent_invocation:
A reference to a parent invocation, if this invocation is part of a nested call structure.
This attribute is used to maintain the invocation hierarchy in complex task workflows.
:param Optional[str] _invocation_id:
A unique identifier for the invocation. This ID is crucial for tracking and orchestrating the invocation
across the distributed system. It's assigned internally and used by the orchestration mechanism.
"""
parent_invocation: DistributedInvocation | None
_invocation_id: str | None = None
[docs]
def __post_init__(self) -> None:
super().__post_init__()
self.app.state_backend.upsert_invocation(self)
@cached_property
def invocation_id(self) -> str:
"""on deserialization allows to set the invocation_id"""
return self._invocation_id or super().invocation_id
@property
def num_retries(self) -> int:
""":return: number of times the invocation got retried"""
return self.app.orchestrator.get_invocation_retries(self)
@property
def status(self) -> InvocationStatus:
""":return: status of the invocation from the orchestrator"""
return self.app.orchestrator.get_invocation_status(self)
[docs]
def to_json(self) -> str:
""":return: The serialized invocation"""
inv_dict = {"invocation_id": self.invocation_id, "call": self.call.to_json()}
if self.parent_invocation:
inv_dict["parent_invocation_id"] = self.parent_invocation.invocation_id
return json.dumps(inv_dict)
[docs]
def __getstate__(self) -> dict:
# Return state as a dictionary and a secondary value as a tuple
state = self.__dict__.copy()
state["invocation_id"] = self.invocation_id
return state
[docs]
def __setstate__(self, state: dict) -> None:
# Restore instance attributes
for key, value in state.items():
object.__setattr__(self, key, value)
[docs]
@classmethod
def from_json(cls, app: Pynenc, serialized: str) -> DistributedInvocation:
""":return: a new invocation from a serialized invocation"""
inv_dict = json.loads(serialized)
call = Call.from_json(app, inv_dict["call"])
parent_invocation = None
if "parent_invocation_id" in inv_dict:
parent_invocation = app.state_backend.get_invocation(
inv_dict["parent_invocation_id"]
)
return cls(call, parent_invocation, inv_dict["invocation_id"])
[docs]
def swap_context(self) -> DistributedInvocation | None:
"""
Swap the current invocation context with this invocation.
This method is responsible for setting the current invocation context to this invocation.
It uses the `context` module to manage the invocation context and ensure that the current invocation is tracked correctly.
The method is used to manage the invocation context when executing the task associated with this invocation.
It ensures that the current invocation is correctly set and tracked in the distributed environment.
"""
return context.swap_dist_invocation_context(self.app.app_id, self)
[docs]
def reset_context(
self, previous_invocation_context: DistributedInvocation | None
) -> None:
"""
Reset the invocation context to a previous state.
This method is responsible for resetting the current invocation context to a previous state.
It uses the `context` module to manage the invocation context and ensures that the previous invocation context is restored correctly.
:param Any previous_invocation_context:
The previous invocation context to restore. This value is returned by the `swap_context` method.
"""
context.swap_dist_invocation_context(
self.app.app_id, previous_invocation_context
)
[docs]
def run(self, runner_args: dict[str, Any] | None = None) -> None:
"""
Execute the task associated with this invocation in a distributed environment.
This method is responsible for running the task's function with its arguments, handling retriable exceptions,
and updating the task's state in the orchestrator. It manages the invocation context and communicates with
the orchestrator to set the invocation's run state and result.
:param dict[str, Any] | None runner_args:
Optional arguments passed from/to the runner. These arguments can be used for synchronization
in subprocesses or other runner-specific tasks. Default is None.
The method updates the orchestrator with the status of the invocation (`InvocationStatus`) and
logs the execution process. In case of exceptions, especially retriable ones, it follows the defined
retry logic as per the task's configuration.
The return value or any raised exception is stored in the orchestrator and can be retrieved by the caller.
```{note}
- If a retriable exception occurs and the number of retries has not been exceeded, the method
will set the invocation for a retry and log a warning.
- If the maximum retries are reached or a non-retriable exception occurs, the exception will be raised
after updating the orchestrator.
```
The invocation's context is managed to ensure the correct tracking of the current and parent invocations.
:raises Exception:
Raises the original exception if a non-retriable exception occurs or the maximum retries are reached for a retriable exception.
"""
# runner_args are passed from/to the runner (e.g. used to sync subprocesses)
context.runner_args = runner_args
try:
self.task.logger.info(f"Invocation {self.invocation_id} started")
previous_invocation_context = self.swap_context()
if not self.app.orchestrator.is_authorize_to_run_by_concurrency_control(
self
):
self.app.orchestrator.reroute_invocations({self})
self.app.orchestrator.set_invocation_run(self.parent_invocation, self)
result = run_task_sync(self.task.func, **self.arguments.kwargs)
self.app.orchestrator.set_invocation_result(self, result)
self.task.logger.info(f"Invocation {self.invocation_id} finished")
except self.task.retriable_exceptions as ex:
if self.num_retries >= self.task.conf.max_retries:
self.task.logger.exception("Max retries reached")
self.app.orchestrator.set_invocation_exception(self, ex)
raise ex
self.app.orchestrator.set_invocation_retry(self, ex)
self.task.logger.warning(
f"Invocation {self.invocation_id} marked for retry {ex=}"
)
except Exception as ex:
self.app.logger.exception(f"Invocation {self.invocation_id} exception")
self.app.orchestrator.set_invocation_exception(self, ex)
raise ex
finally:
self.reset_context(previous_invocation_context)
@property
def result(self) -> Result:
"""
Retrieve the result of the task execution.
This property method is responsible for obtaining the final result of the task associated with this invocation.
If the task is not yet completed (i.e., its status is not final), it enters a waiting state. The method ensures
that it waits for the task to reach a final state before returning the result.
The waiting mechanism involves communicating with the orchestrator and potentially the runner, depending on the
task's current state and execution context. If the task is part of a nested call structure, it also considers the
parent invocation's state.
Once the task reaches a final state, the method retrieves and returns the final result of the task execution.
:return: The result of the task execution. The exact type of `Result` depends on the task's implementation.
```{note}
- This method will block until the task execution is complete and the result is available.
- If called on an invocation that is not yet in a final state, it will wait (potentially indefinitely)
for the task to complete.
```
"""
self.app.logger.info(f"ini waiting for invocation {self.invocation_id} result")
if not self.status.is_final():
self.app.orchestrator.waiting_for_results(self.parent_invocation, [self])
while not self.status.is_final():
self.app.runner.waiting_for_results(
self.parent_invocation, [self], context.runner_args
)
self.app.logger.info(f"end waiting for invocation {self.invocation_id} result")
return self.get_final_result()
[docs]
async def async_result(self) -> Result:
# Assuming an async_waiting_for_results will be implemented in the runner:
if not self.status.is_final():
self.app.orchestrator.waiting_for_results(self.parent_invocation, [self])
while not self.status.is_final():
await self.app.runner.async_waiting_for_results(
self.parent_invocation, [self], context.runner_args
)
return self.get_final_result()
[docs]
def get_final_result(self) -> Result:
"""
Retrieve the final result of the task execution if the invocation is in a final state.
This method checks if the invocation has reached a final state. If it has, the method then retrieves and returns
the result of the task execution from the state backend. In case the invocation is in a failed state, it raises
an exception with the details of the failure.
:return: The final result of the task execution.
:rtype: Result
:raises exceptions.InvocationError: If the invocation is not in a final state when this method is called.
"""
if not self.status.is_final():
raise exceptions.InvocationError(
self.invocation_id, "Invocation is not final"
)
if self.status == InvocationStatus.FAILED:
raise self.app.state_backend.get_exception(self)
return self.app.state_backend.get_result(self)
[docs]
class DistributedInvocationGroup(
BaseInvocationGroup[Params, Result, DistributedInvocation]
):
"""
A group of distributed invocations for a specific task in a distributed environment.
This class extends `BaseInvocationGroup` to handle groups of `DistributedInvocation` instances.
:param Task task:
The task associated with these invocations.
:param list[DistributedInvocation] invocations:
A list of distributed invocations, each an instance of `DistributedInvocation`.
The `DistributedInvocationGroup` is specifically designed for use in distributed environments, where task executions are spread across multiple nodes or processes.
"""
@property
def results(self) -> Iterator[Result]:
"""
An iterator over the results of the invocations in the group.
This property method iterates over the `DistributedInvocation` instances in the group,
yielding the result of each invocation once it reaches a final state.
If an invocation has not yet completed, it waits for the result to become available.
The method ensures that the orchestrator is notified about the waiting invocations
and communicates with the runner to handle the waiting state efficiently.
:return:
An iterator over the results of each invocation in the group.
:rtype:
Iterator[Result]
```{note}
This method will block until all invocations in the group have completed and their results are available.
```
"""
waiting_invocations = self.invocations.copy()
if not waiting_invocations:
return
parent_invocation = waiting_invocations[0].parent_invocation
notified_orchestrator = False
while waiting_invocations:
for invocation in waiting_invocations:
if invocation.status.is_final():
waiting_invocations.remove(invocation)
yield invocation.result
if not waiting_invocations:
break
if not notified_orchestrator:
self.app.orchestrator.waiting_for_results(
parent_invocation, waiting_invocations
)
notified_orchestrator = True
self.app.runner.waiting_for_results(
parent_invocation, waiting_invocations, context.runner_args
)
[docs]
async def async_results(self) -> AsyncGenerator[Result, None]:
waiting_invocations = self.invocations.copy()
if not waiting_invocations:
return
parent_invocation = waiting_invocations[0].parent_invocation
notified_orchestrator = False
while waiting_invocations:
for invocation in waiting_invocations:
if invocation.status.is_final():
waiting_invocations.remove(invocation)
yield invocation.result
if not waiting_invocations:
break
if not notified_orchestrator:
self.app.orchestrator.waiting_for_results(
parent_invocation, waiting_invocations
)
notified_orchestrator = True
await self.app.runner.async_waiting_for_results(
parent_invocation, waiting_invocations, context.runner_args
)
[docs]
@dataclass(frozen=True)
class ReusedInvocation(DistributedInvocation):
"""
A specialized invocation that reuses an existing `DistributedInvocation`.
This class is used for scenarios where an existing invocation is reused, possibly with some differences
in arguments. It adds an attribute to track these argument differences.
:ivar Arguments | None diff_arg:
An optional `Arguments` object representing any differences in arguments compared to the original invocation.
If `None`, it indicates no differences in arguments.
"""
# Due to single invocation functionality
# keeps existing invocation + new argument if any change
diff_arg: Arguments | None = None
[docs]
@classmethod
def from_existing(
cls, invocation: DistributedInvocation, diff_arg: Arguments | None = None
) -> ReusedInvocation:
"""
Create a `ReusedInvocation` instance from an existing `DistributedInvocation`.
This method constructs a new `ReusedInvocation` based on an existing distributed invocation. It reuses
the invocation ID and other relevant attributes from the original invocation and allows specifying
differences in arguments.
:param DistributedInvocation invocation:
The existing invocation to reuse.
:param Arguments | None diff_arg:
Optional argument differences for the new invocation. If provided, these arguments will be used to
distinguish the new invocation from the original. Default is None.
:return:
A new instance of `ReusedInvocation` based on the provided existing invocation.
:rtype:
ReusedInvocation
"""
new_invc = cls(
call=Call(invocation.task, invocation.arguments),
parent_invocation=invocation.parent_invocation,
# we reuse invocation_id from original invocation
_invocation_id=invocation.invocation_id,
diff_arg=diff_arg,
)
return new_invc