Source code for pynenc.invocation.base_invocation

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator, Iterator
from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING, Any, Generic, TypeVar

from pynenc.call import Call
from pynenc.identifiers.invocation_id import InvocationId, generate_invocation_id
from pynenc.types import Params, Result

if TYPE_CHECKING:
    from pynenc.app import Pynenc
    from pynenc.arguments import Arguments
    from pynenc.task import Task
    from pynenc.invocation.status import InvocationStatus
    from pynenc.workflow.workflow_identity import WorkflowIdentity


T = TypeVar("T", bound="BaseInvocation")


[docs] class BaseInvocation(ABC, Generic[Params, Result]): """ Base class for representing an invocation of a task call in a distributed system. In the context of the system, the following concepts are key: - Function: A standard Python function. - Task: A Pynenc object encapsulating a function, enabling it to run in a distributed environment. Tasks are unique by module and function name and cannot be nested. - Call: A specific call to a task with a set of arguments, unique per function and argument set. - **Invocation**: A specific execution instance of a call. A single task can be called with different arguments, and each call can be executed multiple times. The `BaseInvocation` class serves as a template for two key types of invocations: - `DistributedInvocation`: The primary invocation type used in the system for distributed execution. - `ConcurrentInvocation`: Used for local execution, primarily in testing environments without a runner. ```{important} Sync invocations cannot be used in production environments, only for testing in sync mode. ``` :param Call[Params, Result] call: The specific call instance that this invocation represents. """ def __init__( self, call: Call[Params, Result], invocation_id: InvocationId | None = None, ): """Initialize the invocation with its identity.""" self._call = call self._invocation_id = invocation_id or generate_invocation_id() @property def call(self) -> Call[Params, Result]: """Get the call associated with this invocation.""" return self._call @property def invocation_id(self) -> InvocationId: return self._invocation_id @property def app(self) -> Pynenc: return self.call.app @property def task(self) -> Task[Params, Result]: return self.call.task @property def arguments(self) -> Arguments: return self.call.arguments @property @abstractmethod def workflow(self) -> WorkflowIdentity: """""" @property @abstractmethod def status(self) -> InvocationStatus: """""" @property @abstractmethod def result(self) -> Result: """"""
[docs] @abstractmethod async def async_result(self) -> Result: """"""
@property @abstractmethod def num_retries(self) -> int: """"""
[docs] def __str__(self) -> str: return f"{self.__class__.__name__}(invocation_id={self.invocation_id}, {self.call})"
[docs] def __repr__(self) -> str: return self.__str__()
[docs] def __hash__(self) -> int: return hash(self.invocation_id)
[docs] def __eq__(self, other: Any) -> bool: if not isinstance(other, BaseInvocation): return False return ( self.invocation_id == other.invocation_id and self.call.call_id == other.call.call_id )
[docs] @dataclass(frozen=True) class BaseInvocationGroup(ABC, Generic[Params, Result, T]): """ Abstract base class for grouping multiple invocations of a specific task. This class is designed to aggregate a collection of invocations, each represented by a `BaseInvocation` or its subclasses. It is useful in scenarios where multiple invocations of a task need to be managed or processed together. Subclasses of `BaseInvocationGroup`, such as `ConcurrentInvocationGroup` and `DistributedInvocationGroup`, provide specific implementations for synchronous and distributed environments, respectively. :param Task task: The task associated with the invocations. :param dict[InvocationId, BaseInvocation] invocations: A dictionary of invocations, each an instance of a `BaseInvocation` subclass. """ task: Task invocations: list[T] @cached_property def invocation_map(self) -> dict[InvocationId, T]: return {inv.invocation_id: inv for inv in self.invocations} @property def app(self) -> Pynenc: return self.task.app
[docs] def __iter__(self) -> Iterator[T]: return iter(self.invocations)
@property @abstractmethod def results(self) -> Iterator[Result]: """ Provide an iterator over the results. :return Iterator[Result]: An iterator over the results of the invocations. """
[docs] @abstractmethod def async_results(self) -> AsyncGenerator[Result, None]: """ An async iterator over the results of the invocations in the group. This method asynchronously iterates over the `ConcurrentInvocation` instances, yielding the result of each invocation using their async_result method. :return: An async iterator over the results of each invocation in the group. :rtype: AsyncIterator[Result] """