Basic Local Threaded¶
Overview¶
This guide provides a simple demonstration of using Pynenc in a non-distributed, local-threaded environment. The full source code for this demonstration is available on GitHub: basic_local_threaded_example.
Note
This demonstration is intended for educational and testing purposes and is not suitable for a production environment.
Scenario¶
This scenario illustrates the simplest use case of Pynenc, where tasks are defined and executed within the same Python process using threading. It demonstrates how tasks can be managed and processed using the Pynenc library in a controlled environment.
Setup¶
Requirements¶
Python 3.11 or higher.
Pynenc library installed.
Project Files¶
tasks.py: Contains the definition of a simpleaddtask.sample.py: Demonstrates executing theaddtask in various modes.
Demonstration¶
Defining a Task¶
In tasks.py, we define a basic task using the Pynenc decorator:
from pynenc import Pynenc
app = Pynenc()
@app.task
def add(x: int, y: int) -> int:
add.logger.info(f"{add.task_id=} Adding {x} + {y}")
return x + y
This add function is a simple task that logs its operation and returns the sum of two integers.
Executing Tasks¶
In sample.py, we demonstrate different ways to execute the add task:
Running on a Separate Thread¶
import threading
import tasks
def run_add_on_thread() -> None:
def run_in_thread() -> None:
tasks.app.conf.runner_cls = "ThreadRunner"
tasks.app.runner.run()
thread = threading.Thread(target=run_in_thread, daemon=True)
thread.start()
invocation = tasks.add(1, 2)
This function runs the add task on a separate thread, illustrating basic parallel processing within a single process.
Synchronous Execution¶
def run_sync() -> None:
tasks.app.conf.dev_mode_force_sync_tasks = True
invocation = tasks.add(1, 2)
Here, the task is forced to run synchronously on the main thread.
Handling Timeouts¶
def run_without_worker_add() -> None:
"""Run add task on the main thread"""
results = []
def run_task_with_timeout() -> None:
logger.info("Running task with timeout")
invocation = tasks.add(1, 2)
results.append(invocation.result)
logger.info(f"Result: {invocation.result}")
thread = threading.Thread(target=run_task_with_timeout, daemon=True)
thread.start()
thread.join(timeout=2)
if results != []:
raise ValueError(f"Expected [], got {results}")
logger.info(f"Task timeout, there was no worker to run the task")
This function demonstrates how to handle scenarios where the task execution might hang indefinitely due to the lack of an external worker.
Conclusion¶
This basic demonstration provides an introductory understanding of how Pynenc can be used for task processing in a local, threaded environment. It highlights the versatility of Pynenc in handling tasks and serves as a foundational guide for more complex implementations.