pynenc.invocation.dist_invocation¶
Module Contents¶
Classes¶
Represents a distributed invocation of a task call in the system. |
|
A group of distributed invocations for a specific task in a distributed environment. |
|
A specialized invocation that reuses an existing |
API¶
- class pynenc.invocation.dist_invocation.DistributedInvocation(call: pynenc.call.Call[pynenc.types.Params, pynenc.types.Result], parent_invocation: pynenc.invocation.dist_invocation.DistributedInvocation | None = None, invocation_id: str | None = None, stored_in_backend: bool = False)[source]¶
Bases:
pynenc.invocation.base_invocation.BaseInvocation[pynenc.types.Params,pynenc.types.Result]Represents a distributed invocation of a task call in the system.
This class is a specific implementation of
BaseInvocationfor use in distributed environments. It extends the base invocation with additional features necessary for handling task executions in a distributed manner.In the distributed system,
DistributedInvocationhandles the execution of a task across different nodes or processes. It includes mechanisms for tracking and managing the state of an invocation in a distributed context.- Parameters:
call (Call[Params, Result]) – Inherits the
callattribute fromBaseInvocation, representing the specific task call that this invocation executes.parent_invocation (DistributedInvocation | None) – A reference to a parent invocation, if this invocation is part of a nested call structure. This attribute is used to maintain the invocation hierarchy in complex task workflows.
_invocation_id (Optional[str]) – A unique identifier for the invocation. This ID is crucial for tracking and orchestrating the invocation across the distributed system. It’s assigned internally and used by the orchestration mechanism.
_disable_upsert (bool) – A flag to disable the upsert operation during initialization. This flag is used to prevent unnecessary updates to the state backend when creating invocations in certain scenarios.
Initialization
Initialize the invocation with its identity.
- property parent_invocation: pynenc.invocation.dist_invocation.DistributedInvocation | None¶
- Returns:
the parent invocation of this invocation
- update_status_cache(status: pynenc.invocation.status.InvocationStatus) None[source]¶
Update the cached status and timestamp.
- property status: pynenc.invocation.status.InvocationStatus¶
- Returns:
status of the invocation from the orchestrator
- classmethod from_json(app: pynenc.app.Pynenc, serialized: str) pynenc.invocation.dist_invocation.DistributedInvocation[source]¶
- Returns:
a new invocation from a serialized invocation
- swap_context() pynenc.invocation.dist_invocation.DistributedInvocation | None[source]¶
Swap the current invocation context with this invocation.
This method is responsible for setting the current invocation context to this invocation. It uses the
contextmodule to manage the invocation context and ensure that the current invocation is tracked correctly.The method is used to manage the invocation context when executing the task associated with this invocation. It ensures that the current invocation is correctly set and tracked in the distributed environment.
- reset_context(previous_invocation_context: pynenc.invocation.dist_invocation.DistributedInvocation | None) None[source]¶
Reset the invocation context to a previous state.
This method is responsible for resetting the current invocation context to a previous state. It uses the
contextmodule to manage the invocation context and ensures that the previous invocation context is restored correctly.- Parameters:
previous_invocation_context (Any) – The previous invocation context to restore. This value is returned by the
swap_contextmethod.
- run(runner_args: dict[str, Any] | None = None) None[source]¶
Execute the task associated with this invocation in a distributed environment.
This method is responsible for running the task’s function with its arguments, handling retriable exceptions, and updating the task’s state in the orchestrator. It manages the invocation context and communicates with the orchestrator to set the invocation’s run state and result.
- Parameters:
runner_args (dict[str, Any] | None) – Optional arguments passed from/to the runner. These arguments can be used for synchronization in subprocesses or other runner-specific tasks. Default is None.
The method updates the orchestrator with the status of the invocation (
InvocationStatus) and logs the execution process. In case of exceptions, especially retriable ones, it follows the defined retry logic as per the task’s configuration.The return value or any raised exception is stored in the orchestrator and can be retrieved by the caller.
Note
- If a retriable exception occurs and the number of retries has not been exceeded, the method will set the invocation for a retry and log a warning. - If the maximum retries are reached or a non-retriable exception occurs, the exception will be raised after updating the orchestrator.
The invocation’s context is managed to ensure the correct tracking of the current and parent invocations.
- Raises:
Exception – Raises the original exception if a non-retriable exception occurs or the maximum retries are reached for a retriable exception.
- property result: pynenc.types.Result¶
Retrieve the result of the task execution.
This property method is responsible for obtaining the final result of the task associated with this invocation. If the task is not yet completed (i.e., its status is not final), it enters a waiting state. The method ensures that it waits for the task to reach a final state before returning the result.
The waiting mechanism involves communicating with the orchestrator and potentially the runner, depending on the task’s current state and execution context. If the task is part of a nested call structure, it also considers the parent invocation’s state.
Once the task reaches a final state, the method retrieves and returns the final result of the task execution.
- Returns:
The result of the task execution. The exact type of
Resultdepends on the task’s implementation.
Note
- This method will block until the task execution is complete and the result is available. - If called on an invocation that is not yet in a final state, it will wait (potentially indefinitely) for the task to complete.
- get_final_result() pynenc.types.Result[source]¶
Retrieve the final result of the task execution if the invocation is in a final state.
This method checks if the invocation has reached a final state. If it has, the method then retrieves and returns the result of the task execution from the state backend. In case the invocation is in a failed state, it raises an exception with the details of the failure.
- Returns:
The final result of the task execution.
- Return type:
Result
- Raises:
exceptions.InvocationError – If the invocation is not in a final state when this method is called.
- class pynenc.invocation.dist_invocation.DistributedInvocationGroup[source]¶
Bases:
pynenc.invocation.base_invocation.BaseInvocationGroup[pynenc.types.Params,pynenc.types.Result,pynenc.invocation.dist_invocation.DistributedInvocation]A group of distributed invocations for a specific task in a distributed environment.
This class extends
BaseInvocationGroupto handle groups ofDistributedInvocationinstances.- Parameters:
task (Task) – The task associated with these invocations.
invocations (list[DistributedInvocation]) – A list of distributed invocations, each an instance of
DistributedInvocation.
The
DistributedInvocationGroupis specifically designed for use in distributed environments, where task executions are spread across multiple nodes or processes.- property results: collections.abc.Iterator[pynenc.types.Result]¶
An iterator over the results of the invocations in the group.
This property method iterates over the
DistributedInvocationinstances in the group, yielding the result of each invocation once it reaches a final state. If an invocation has not yet completed, it waits for the result to become available.The method ensures that the orchestrator is notified about the waiting invocations and communicates with the runner to handle the waiting state efficiently.
- Returns:
An iterator over the results of each invocation in the group.
- Return type:
Iterator[Result]
Note
This method will block until all invocations in the group have completed and their results are available.
- async async_results() collections.abc.AsyncGenerator[pynenc.types.Result, None][source]¶
- class pynenc.invocation.dist_invocation.ReusedInvocation(call: pynenc.call.Call[pynenc.types.Params, pynenc.types.Result], parent_invocation: pynenc.invocation.dist_invocation.DistributedInvocation | None = None, invocation_id: str | None = None, diff_arg: pynenc.arguments.Arguments | None = None)[source]¶
Bases:
pynenc.invocation.dist_invocation.DistributedInvocationA specialized invocation that reuses an existing
DistributedInvocation.This class is used for scenarios where an existing invocation is reused, possibly with some differences in arguments. It adds an attribute to track these argument differences.
- Variables:
diff_arg (Arguments | None) – An optional
Argumentsobject representing any differences in arguments compared to the original invocation. IfNone, it indicates no differences in arguments.
Initialization
Initialize the invocation with its identity.
- classmethod from_existing(invocation: pynenc.invocation.dist_invocation.DistributedInvocation, diff_arg: pynenc.arguments.Arguments | None = None) pynenc.invocation.dist_invocation.ReusedInvocation[source]¶
Create a
ReusedInvocationinstance from an existingDistributedInvocation.This method constructs a new
ReusedInvocationbased on an existing distributed invocation. It reuses the invocation ID and other relevant attributes from the original invocation and allows specifying differences in arguments.- Parameters:
invocation (DistributedInvocation) – The existing invocation to reuse.
diff_arg (Arguments | None) – Optional argument differences for the new invocation. If provided, these arguments will be used to distinguish the new invocation from the original. Default is None.
- Returns:
A new instance of
ReusedInvocationbased on the provided existing invocation.- Return type: