Concurrency Control¶
Overview¶
This guide provides a detailed look at the concurrency_control sample, showcasing concurrency control mechanisms within Pynenc for task processing. It demonstrates how tasks can be managed and executed according to specific concurrency requirements.
The full source code is available on GitHub: concurrency_control.
Scenario¶
This example explores various concurrency control methods, including disabling concurrency for task registration and execution, and enforcing task-level concurrency control during registration and runtime. It illustrates configuring Pynenc for handling concurrency effectively.
Setup¶
Requirements¶
Python 3.11 or higher.
Installed Pynenc library.
Project Files¶
tasks.py: Defines tasks with different concurrency control settings.sample.py: Demonstrates how concurrency control settings impact task execution.
Demonstration¶
Defining Tasks with Concurrency Control¶
In tasks.py, tasks are defined with concurrency control settings. You can specify concurrency directly per task or globally using PynencBuilder.
Option 1: Direct Initialization (Task-Specific Controls)¶
from pynenc import Pynenc, ConcurrencyControlType
from typing import NamedTuple
import time
app = Pynenc()
@app.task(registration_concurrency=ConcurrencyControlType.DISABLED)
def get_own_invocation_id() -> str:
return get_own_invocation_id.invocation.invocation_id
@app.task(registration_concurrency=ConcurrencyControlType.TASK)
def get_own_invocation_id_registration_concurrency() -> str:
return get_own_invocation_id_registration_concurrency.invocation.invocation_id
class SleepResult(NamedTuple):
start: float
end: float
@app.task(running_concurrency=ConcurrencyControlType.DISABLED)
def sleep_without_running_concurrency(seconds: float) -> SleepResult:
start = time.time()
time.sleep(seconds)
return SleepResult(start=start, end=time.time())
@app.task(running_concurrency=ConcurrencyControlType.TASK)
def sleep_with_running_concurrency(seconds: float) -> SleepResult:
start = time.time()
time.sleep(seconds)
return SleepResult(start=start, end=time.time())
Option 2: Using PynencBuilder (Default Concurrency Controls)¶
Alternatively, use PynencBuilder to configure default concurrency controls for all tasks, with the option to override them individually. This builder configuration can coexist with other configuration methods such as pyproject.toml, environment variables, YAML, and JSON.
from pynenc import Pynenc, ConcurrencyControlType
from pynenc.builder import PynencBuilder
from typing import NamedTuple
import time
app = (
PynencBuilder()
.concurrency_control(
running_concurrency=ConcurrencyControlType.DISABLED, # Default running concurrency
registration_concurrency=ConcurrencyControlType.DISABLED # Default registration concurrency
)
.build()
)
@app.task # Inherits DISABLED concurrency for both running and registration
def get_own_invocation_id() -> str:
return get_own_invocation_id.invocation.invocation_id
@app.task(registration_concurrency=ConcurrencyControlType.TASK) # Overrides default registration concurrency
def get_own_invocation_id_registration_concurrency() -> str:
return get_own_invocation_id_registration_concurrency.invocation.invocation_id
class SleepResult(NamedTuple):
start: float
end: float
@app.task # Inherits DISABLED running concurrency
def sleep_without_running_concurrency(seconds: float) -> SleepResult:
start = time.time()
time.sleep(seconds)
return SleepResult(start=start, end=time.time())
@app.task(running_concurrency=ConcurrencyControlType.TASK) # Overrides default running concurrency
def sleep_with_running_concurrency(seconds: float) -> SleepResult:
start = time.time()
time.sleep(seconds)
return SleepResult(start=start, end=time.time())
get_own_invocation_iduses default (DISABLED) concurrency.get_own_invocation_id_registration_concurrencyexplicitly overrides default registration concurrency toTASK.sleep_without_running_concurrencyandsleep_with_running_concurrencyhighlight inherited versus overridden concurrency controls for running tasks.
Using PynencBuilder.concurrency_control(), you define global defaults easily, applying consistency across tasks while retaining flexibility.
Executing Tasks with Concurrency Controls¶
The sample.py script demonstrates how concurrency settings influence task execution:
Running Without Concurrency Control¶
Illustrates execution without enforced concurrency, creating separate invocation IDs per call.
def run_without_concurrency_control() -> None:
invocations = [tasks.get_own_invocation_id() for _ in range(10)]
logger.info(f"Invocation ids: " + ", ".join(i.invocation_id for i in invocations))
Running with Registration Concurrency Control¶
Demonstrates that registration concurrency control (TASK) routes multiple calls to a single invocation.
def run_with_registration_concurrency_control() -> None:
invocations = [tasks.get_own_invocation_id_registration_concurrency() for _ in range(3)]
unique_invocation_ids = set(i.invocation_id for i in invocations)
logger.info(f"Unique invocation_id: {unique_invocation_ids}")
Running with Execution (Running) Concurrency Control¶
Demonstrates the difference between parallel and sequential execution based on running concurrency settings.
def run_with_running_concurrency_control() -> None:
# Without concurrency control: parallel execution
no_control_invocations = [
tasks.sleep_without_running_concurrency(0.1) for _ in range(10)
]
no_control_results = [i.result for i in no_control_invocations]
if not any_run_in_parallel(no_control_results):
raise ValueError(f"Expected parallel execution, got {no_control_results}")
# With concurrency control: sequential execution
controlled_invocations = [
tasks.sleep_with_running_concurrency(0.1) for _ in range(10)
]
controlled_results = [i.result for i in controlled_invocations]
if any_run_in_parallel(controlled_results):
raise ValueError(f"Expected sequential execution, got {controlled_results}")
Each demonstration section aims to clearly illustrate how different concurrency configurations affect task execution within Pynenc.
Conclusion¶
The concurrency_control sample introduces concurrency management within Pynenc clearly and practically. By using task-specific settings or global defaults via PynencBuilder, developers gain powerful and flexible options for controlling concurrent task execution.