Concurrency Control¶
Overview¶
Pynenc lets you cap how many invocations of a task can be registered or running at the same time, without an external lock service. The orchestrator already tracks every invocation it accepts; checking whether a new one would conflict with an existing one is the same kind of lookup.
This guide covers:
The four concurrency scopes (
DISABLED,TASK,ARGUMENTS,KEYS).The two flags that apply them (
registration_concurrencyat enqueue time,running_concurrencyat run time).key_argumentsfor narrowing the conflict check to a subset of arguments.reroute_on_concurrency_controlfor choosing what happens to blocked invocations.A runnable demo,
concurrency_demo, that visualises all of this on a pynmon timeline.
Concurrency scopes¶
ConcurrencyControlType defines what counts as a duplicate:
Scope |
What blocks what |
|---|---|
|
Nothing. The default. Every invocation is independent. |
|
At most one invocation of this task at a time. Arguments are ignored. |
|
At most one invocation per full argument tuple. Same args = duplicate; any difference = independent. |
|
At most one invocation per subset of arguments. The subset is declared with |
The same enum value goes on both flags. The flag decides when the check happens:
registration_concurrency— checked when an invocation is enqueued. A duplicate is collapsed into aReusedInvocationpointing to the existing one. The producer never gets a separate invocation back.running_concurrency— checked when a worker tries to execute an invocation. A blocked invocation transitions toCONCURRENCY_CONTROLLED; what happens next depends onreroute_on_concurrency_control.
reroute_on_concurrency_control (default True):
True— the blocked invocation is requeued (REROUTED) and retried later, so it eventually runs once the slot frees up.False— the blocked invocation is discarded permanently (CONCURRENCY_CONTROLLED_FINAL);inv.resultraisesKeyError.
Choosing a scope¶
from pynenc import Pynenc
from pynenc.conf.config_task import ConcurrencyControlType as Mode
app = Pynenc()
# TASK — at most one nightly cleanup running, regardless of arguments.
@app.task(running_concurrency=Mode.TASK)
def nightly_cleanup(target: str) -> None: ...
# ARGUMENTS — same export request collapses; different exports run in parallel.
@app.task(registration_concurrency=Mode.ARGUMENTS)
def export_report(report_id: str, format: str) -> str: ...
# KEYS — serialise on account_id; the `op` argument is ignored when checking.
@app.task(
running_concurrency=Mode.KEYS,
key_arguments=("account_id",),
)
def call_external_api(account_id: str, op: str) -> str: ...
Pick the scope that matches the rule you actually need:
“Only one of these tasks ever runs at a time.” →
TASK.“Don’t run the same job twice.” →
ARGUMENTS.“Don’t run two jobs that share this key.” →
KEYS+key_arguments.
Per-key concurrency: the common case¶
The most common production need is per-tenant or per-account
concurrency: parallelism across tenants, serialisation within each
tenant. That is exactly what KEYS is for.
@app.task(
running_concurrency=Mode.KEYS,
key_arguments=("account_id",),
reroute_on_concurrency_control=True,
)
def call_external_api(account_id: str, op: str, payload: dict) -> str:
...
With this:
At most one running invocation per
account_idat any time.Different
account_idvalues run in parallel across all your workers.Blocked invocations are re-queued and complete once the slot opens.
Set reroute_on_concurrency_control=False instead when the right policy
is “if a call for this account is already running, drop the new one”.
Queue depth stays flat, no retry storm.
Add registration_concurrency=Mode.KEYS when duplicate enqueues for the
same key should collapse before they ever reach a worker — useful when an
event bus or scheduler may fire the same logical job many times.
Worked example: concurrency_demo¶
The concurrency_demo
sample runs four scenarios against a tiny FastAPI server that records a
COLLISION whenever two requests for the same account overlap. Run it
with:
git clone https://github.com/pynenc/samples
cd samples/concurrency_demo
uv sync
uv run python sample.py
All four scenarios on a single pynmon timeline:
A — no concurrency control¶
Baseline. Eight worker threads pick up four invocations per account in parallel; the provider records nine collisions.
=== A. unsafe — no concurrency control ===
12 enqueued -> 12 calls, 9 collisions, 1.42s
X acme calls=4 collisions=3
X globex calls=4 collisions=3
X initech calls=4 collisions=3
B — running_concurrency=KEYS, reroute=True¶
Same 12 calls, zero collisions. The orchestrator refuses to start a second
invocation while one with the same account_id is running; blocked
invocations are rerouted until their slot opens.
=== B. keyed — running_concurrency=KEYS, reroute=True ===
12 enqueued -> 12 calls, 0 collisions, 2.14s
OK acme calls=4 collisions=0
OK globex calls=4 collisions=0
OK initech calls=4 collisions=0
C — running_concurrency=KEYS, reroute=False¶
Same guard, opposite policy. Blocked invocations land in
CONCURRENCY_CONTROLLED_FINAL and are dropped. Only the first call per
account ever reaches the provider.
=== C. drop — running_concurrency=KEYS, reroute=False ===
12 enqueued -> 3 calls (9 dropped), 0 collisions, 0.67s
OK acme calls=1 collisions=0
OK globex calls=1 collisions=0
OK initech calls=1 collisions=0
D — registration_concurrency=KEYS + running_concurrency=KEYS¶
Dedupe at the door. 24 enqueues collapse to 3 invocations before a
worker ever sees them, because every duplicate registration returns a
ReusedInvocation pointing at the first.
=== D. dedupe — registration + running KEYS ===
24 enqueued -> 3 calls (21 deduped), 0 collisions, 0.57s
OK acme calls=1 collisions=0
OK globex calls=1 collisions=0
OK initech calls=1 collisions=0
Configuring defaults with PynencBuilder¶
Concurrency settings can also be set as app-wide defaults and overridden per task:
from pynenc import ConcurrencyControlType as Mode
from pynenc.builder import PynencBuilder
app = (
PynencBuilder()
.concurrency_control(
running_concurrency=Mode.DISABLED,
registration_concurrency=Mode.DISABLED,
)
.build()
)
@app.task # inherits app defaults
def fast_path() -> None: ...
@app.task(running_concurrency=Mode.TASK) # overrides only running
def heavy_singleton() -> None: ...
Builder defaults compose with all other configuration sources
(pyproject.toml, environment variables, YAML, JSON).
Roadmap¶
The current primitive enforces exactly one in-flight or registered invocation per scope. Two extensions that build on the same orchestrator machinery are on the roadmap:
Multi-slot concurrency — “up to N in flight per key”.
Time-window rate limits — “at most M per minute per key”.