Source code for pynenc.core_tasks
from typing import Any, TYPE_CHECKING, NamedTuple
from collections.abc import Callable
from pynenc import context
from pynenc.conf.config_task import ConcurrencyControlType
from pynenc.invocation.status import InvocationStatus
if TYPE_CHECKING:
from pynenc.app import Pynenc
from pynenc.identifiers.invocation_id import InvocationId
from pynenc.runner.runner_context import RunnerContext
[docs]
class CoreTaskFunction:
"""Wrapper for core task functions that will be bound to an app at runtime.
This wrapper allows the function to be identified as a core task during
deserialization, enabling proper function extraction in Task._from_json.
"""
def __init__(self, func: Callable) -> None:
self.func = func
# Preserve function metadata for proper module import resolution
self.__name__ = func.__name__
self.__module__ = func.__module__
self.__doc__ = func.__doc__
[docs]
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self.func(*args, **kwargs)
[docs]
class CoreTaskDefinition(NamedTuple):
func: Callable
options: dict[str, Any]
config_cron: str | None = None
[docs]
class CoreTaskRegistry:
"""Registry for core tasks definitions that should run in any Pynenc instance."""
def __init__(self) -> None:
self.definitions: list[CoreTaskDefinition] = []
[docs]
def task(
self,
config_cron: str | None = None,
**options: Any,
) -> Callable[[Callable], CoreTaskFunction]:
"""Deferred decorator that stores the function and options"""
if config_cron and options.get("triggers"):
raise ValueError(
"Cannot specify both 'config_cron' and 'triggers' in core task definition"
)
def decorator(func: Callable) -> CoreTaskFunction:
wrapped = CoreTaskFunction(func)
self.definitions.append(CoreTaskDefinition(func, options, config_cron))
return wrapped
return decorator
[docs]
def get_app_and_runner_ctx() -> tuple["Pynenc", "RunnerContext"]:
"""Get the app and runner context from the current execution context"""
app = context.get_current_app()
if not app:
raise RuntimeError("No app context available")
runner_ctx = context.get_runner_context(app.app_id)
if not runner_ctx:
raise RuntimeError("No runner context available")
return app, runner_ctx
core_tasks_registry = CoreTaskRegistry()
[docs]
@core_tasks_registry.task(
running_concurrency=ConcurrencyControlType.TASK,
config_cron="recover_pending_invocations_cron",
)
def recover_pending_invocations() -> None:
"""Recovers PENDING invocations that exceeded the allowed pending time"""
# It will run as a Pynenc tasks
app, runner_ctx = get_app_and_runner_ctx()
invocations_to_reroute: set[InvocationId] = set()
# Recover PENDING invocations that exceeded timeout
for invocation_id in app.orchestrator.get_pending_invocations_for_recovery():
invocations_to_reroute.add(invocation_id)
app.logger.info(f"Recovering timed-out pending invocation:{invocation_id}")
app.orchestrator.set_invocation_status(
invocation_id, InvocationStatus.PENDING_RECOVERY, runner_ctx
)
app.orchestrator.reroute_invocations(invocations_to_reroute, runner_ctx)
[docs]
@core_tasks_registry.task(
running_concurrency=ConcurrencyControlType.TASK,
config_cron="recover_running_invocations_cron",
)
def recover_running_invocations() -> None:
"""Recovers PENDING invocations that exceeded the allowed pending time"""
app, runner_ctx = get_app_and_runner_ctx()
invocations_to_reroute: set[InvocationId] = set()
# Recover RUNNING invocations owned by inactive runners
for invocation_id in app.orchestrator.get_running_invocations_for_recovery():
invocations_to_reroute.add(invocation_id)
app.logger.info(
f"Recovering running invocation:{invocation_id} from inactive runner"
)
app.orchestrator.set_invocation_status(
invocation_id, InvocationStatus.RUNNING_RECOVERY, runner_ctx
)
app.orchestrator.reroute_invocations(invocations_to_reroute, runner_ctx)