Source code for pynenc.invocation.conc_invocation
from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator, Iterator
from typing import TYPE_CHECKING
from pynenc import context
from pynenc.invocation.base_invocation import BaseInvocation, BaseInvocationGroup
from pynenc.invocation.status import InvocationStatus
from pynenc.types import Params, Result
from pynenc.util.asyncio_helper import run_task_sync
if TYPE_CHECKING:
from pynenc.call import Call
from pynenc.workflow.workflow_identity import WorkflowIdentity
[docs]
class ConcurrentInvocation(BaseInvocation[Params, Result]):
"""
A synchronous implementation of a task invocation.
This class represents an invocation of a task in a synchronous context.
:param Call[Params, Result] call: The specific call instance that this invocation represents.
```{danger}
Use only for testing purposes where distributed processing is not required.
```
"""
def __init__(self, call: Call[Params, Result]) -> None:
super().__init__(call)
self._num_retries = 0
self._status = InvocationStatus.REGISTERED
self._cached_result: Result
self._is_result_cached = False
[docs]
def _cache_and_return_result(self, result: Result) -> Result:
"""Cache the result and return it."""
self._cached_result = result
self._is_result_cached = True
return result
@property
def workflow(self) -> WorkflowIdentity:
raise NotImplementedError(
"ConcurrentInvocation does not support workflow identity"
)
@property
def status(self) -> InvocationStatus:
"""
Get the status of the invocation.
:return: The current status of the invocation.
:rtype: InvocationStatus
"""
return self._status
@property
def result(self) -> Result:
"""
Execute the task call and return its result.
This method runs the task synchronously and returns the result.
It handles retries for retriable exceptions as per the task's configuration.
:return: The result of the task execution.
:rtype: Result
:raises Exception: Raised if the task execution results in an unhandled exception.
"""
if self._is_result_cached:
return self._cached_result
previous_invocation_context = context._get_sync_inv_context_storage().get(
self.app.app_id
)
try:
self.task.logger.info(f"Sync invocation:{self.invocation_id} started")
self._status = InvocationStatus.RUNNING
context._get_sync_inv_context_storage()[self.app.app_id] = self
result = run_task_sync(self.task.func, **self.arguments.kwargs)
self._status = InvocationStatus.SUCCESS
self.task.logger.info(f"Sync invocation:{self.invocation_id} finished")
# Cache the result to avoid re-execution on subsequent calls
return self._cache_and_return_result(result)
except self.task.retriable_exceptions as exc:
if self._num_retries >= self.task.conf.max_retries:
self.task.logger.exception(
f"invocation:{self.invocation_id} Max retries reached"
)
self._status = InvocationStatus.FAILED
raise exc
self._status = InvocationStatus.RETRY
self._num_retries += 1
self.task.logger.warning(f"Retrying invocation:{self.invocation_id}")
return self.result
except Exception as exc:
self._status = InvocationStatus.FAILED
raise exc
finally:
context._get_sync_inv_context_storage()[self.app.app_id] = (
previous_invocation_context
)
[docs]
async def async_result(self) -> Result:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: self.result)
@property
def num_retries(self) -> int:
"""
Get the number of retries for the invocation.
:return: The number of times the invocation has been retried.
:rtype: int
"""
return self._num_retries
[docs]
class ConcurrentInvocationGroup(
BaseInvocationGroup[Params, Result, ConcurrentInvocation]
):
"""
A group of synchronous invocations for a specific task.
This class extends `BaseInvocationGroup` to handle groups of `ConcurrentInvocation` instances.
It is designed for scenarios where multiple synchronous invocations of a task are managed or processed together.
:param Task task: The task associated with these invocations.
:param list[ConcurrentInvocation] invocations: A list of synchronous invocations.
```{danger}
Use only for testing purposes where distributed processing is not required.
```
"""
@property
def results(self) -> Iterator[Result]:
"""
An iterator over the results of the invocations in the group.
This property method iterates over the `ConcurrentInvocation` instances in the group,
yielding the result of each invocation.
:return: An iterator over the results of each invocation in the group.
:rtype: Iterator[Result]
"""
for invocation in self.invocations:
yield invocation.result
[docs]
async def async_results(self) -> AsyncGenerator[Result, None]:
"""
An async iterator over the results of the invocations in the group.
This method asynchronously iterates over the `ConcurrentInvocation` instances,
yielding the result of each invocation using their async_result method.
:return: An async iterator over the results of each invocation in the group.
:rtype: AsyncIterator[Result]
"""
for invocation in self.invocations:
yield await invocation.async_result()