"""
Builder pattern implementation for configuring Pynenc applications with plugin support.
This module provides a fluent, chainable builder interface that can be extended by plugins
through Python's entry points system. Plugins automatically register their builder methods
when installed, enabling seamless integration of backend-specific functionality.
Key components:
- PynencBuilder: Core builder class with plugin support
- Plugin registration system via entry points
- Dynamic method resolution for plugin-provided methods
- Validation system for plugin configurations
"""
import warnings
from collections.abc import Callable, Iterable
from typing import TYPE_CHECKING, Any
from pynenc.client_data_store.mem_client_data_store import MemClientDataStore
from pynenc.conf.config_pynenc import ArgumentPrintMode
from pynenc.conf.config_task import ConcurrencyControlType
from pynenc.serializer import JsonPickleSerializer, JsonSerializer, PickleSerializer
from pynenc.trigger.mem_trigger import MemTrigger
# Entry points imports
try:
from importlib.metadata import entry_points
except ImportError:
entry_points = None # type: ignore[assignment]
try:
import pkg_resources # type: ignore[import-untyped]
except ImportError:
pkg_resources = None # type: ignore[assignment]
if TYPE_CHECKING:
from pynenc import Pynenc
[docs]
class PynencBuilder:
"""
A builder pattern implementation for creating and configuring Pynenc applications.
This builder simplifies the configuration process by providing intuitive method chaining
to set up various components of a Pynenc application. Backend-specific methods are
provided by installed plugins through Python's entry points system.
The plugin system automatically discovers and registers methods from installed plugins,
enabling backend-specific configuration methods to become available when plugins are installed.
:example:
```python
# With backend plugins installed
pynenc_app = (
PynencBuilder()
.app_id("my_application")
.serializer_pickle()
.multi_thread_runner(min_threads=1, max_threads=4)
.logging_level("info")
.build()
)
# Memory-based configuration for development
pynenc_app = (
PynencBuilder()
.app_id("my_application")
.memory() # Built-in memory backend
.mem_client_data_store(min_size_to_cache=1024)
.build()
)
```
"""
# Plugin registration system
_plugin_methods: dict[str, Callable] = {}
_plugin_validators: list[Callable] = []
_plugins_loaded = False
def __init__(self) -> None:
"""
Initialize a new PynencBuilder with plugin discovery and empty configuration.
Automatically discovers and loads plugin methods via entry points on first instantiation.
"""
self._config: dict[str, Any] = {}
self._using_memory_components = False
self._plugin_components: set[str] = set()
# Auto-discover plugins on first builder instantiation
self._load_plugins()
[docs]
@classmethod
def _load_plugins(cls) -> None:
"""
Load and register plugin methods via Python entry points system.
Discovers all plugins registered under 'pynenc.plugins' entry point group
and registers their builder methods for dynamic resolution.
"""
if cls._plugins_loaded:
return
cls._plugins_loaded = True
# Try to load plugins using available entry points mechanism
if entry_points is not None:
cls._load_plugins_importlib()
elif pkg_resources is not None:
cls._load_plugins_pkg_resources()
# If neither is available, continue without plugins
[docs]
@classmethod
def _load_plugins_importlib(cls) -> None:
"""Load plugins using importlib.metadata."""
if entry_points is None:
return
try:
# Try Python 3.10+ API first
try:
plugin_entries = list(entry_points(group="pynenc.plugins"))
except TypeError:
# Fallback to older API
eps = entry_points()
if hasattr(eps, "select"):
plugin_entries = list(eps.select(group="pynenc.plugins"))
else:
# For Python 3.8-3.9, entry_points() returns a dict-like object
plugin_entries = list(eps.get("pynenc.plugins", [])) # type: ignore[attr-defined]
for ep in plugin_entries:
cls._register_plugin_from_entry_point(ep)
except Exception:
# Silently continue if plugin loading fails
pass
[docs]
@classmethod
def _load_plugins_pkg_resources(cls) -> None:
"""Load plugins using pkg_resources."""
if pkg_resources is None:
return
try:
for entry_point in pkg_resources.iter_entry_points("pynenc.plugins"):
cls._register_plugin_from_entry_point(entry_point)
except Exception:
# Silently continue if plugin loading fails
pass
[docs]
@classmethod
def _register_plugin_from_entry_point(cls, entry_point: Any) -> None:
"""Register a single plugin from an entry point."""
try:
plugin_class = entry_point.load()
# Plugins should have a register_builder_methods class method
if hasattr(plugin_class, "register_builder_methods"):
plugin_class.register_builder_methods(cls)
except Exception as e:
# Log plugin loading errors but don't fail the builder
warnings.warn(
f"Failed to load plugin {entry_point.name}: {e}",
UserWarning,
stacklevel=2,
)
[docs]
@classmethod
def register_plugin_method(cls, method_name: str, method_func: Callable) -> None:
"""
Register a plugin method to be available on PynencBuilder instances.
This allows plugins to extend the builder with their own configuration methods.
Plugins should call this during their registration process.
:param str method_name: Name of the method to register (e.g., 'redis', 'mongodb')
:param Callable method_func: The method function that takes builder as first argument
:raises ValueError: If method name conflicts with existing core methods
"""
# Check for conflicts with core methods (but allow plugin method overrides)
if hasattr(cls, method_name) and method_name not in cls._plugin_methods:
raise ValueError(
f"Cannot register plugin method '{method_name}': conflicts with core method"
)
cls._plugin_methods[method_name] = method_func
[docs]
@classmethod
def register_plugin_validator(cls, validator_func: Callable) -> None:
"""
Register a plugin validator function that validates configuration before build().
Validators should raise ValueError if the configuration is invalid.
:param Callable validator_func: Function that takes (config: dict) and raises ValueError if invalid
"""
cls._plugin_validators.append(validator_func)
[docs]
def __getattr__(self, name: str) -> Callable:
"""
Dynamic method resolution for plugin-registered methods.
This enables plugins to add methods that are automatically
available when the plugin is installed.
:param str name: Method name being accessed
:return: Bound method from plugin
:raises AttributeError: If method not found in plugins with helpful error message
"""
if name in self._plugin_methods:
# Bind the plugin method to this instance
plugin_method = self._plugin_methods[name]
return lambda *args, **kwargs: plugin_method(self, *args, **kwargs)
# Provide helpful error message suggesting plugin installation
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{name}'. "
f"This method may be provided by a plugin. Check available plugins and ensure they are installed."
)
[docs]
def app_id(self, app_id: str) -> "PynencBuilder":
"""
Set the application ID for the Pynenc application.
The application ID uniquely identifies this Pynenc application instance
and is used in logging, monitoring, and component configuration.
:param str app_id: The unique identifier for this application
:return: The builder instance for method chaining
"""
self._config["app_id"] = app_id
return self
[docs]
def memory(self) -> "PynencBuilder":
"""
Configure in-memory components for the Pynenc application.
This sets up all components (orchestrator, broker, state backend,
and argument cache) to use in-memory backends. This is primarily
for testing and development purposes.
Note: In-memory components are only compatible with certain runners.
:return: The builder instance for method chaining
"""
self._config.update(
{
"orchestrator_cls": "MemOrchestrator",
"broker_cls": "MemBroker",
"state_backend_cls": "MemStateBackend",
"client_data_store_cls": MemClientDataStore.__name__,
"trigger_cls": MemTrigger.__name__,
}
)
self._using_memory_components = True
self._plugin_components.clear() # Clear any plugin components
return self
[docs]
def sqlite(self, sqlite_db_path: str | None = None) -> "PynencBuilder":
"""
Configure SQLite components for the Pynenc application.
This sets up all components (orchestrator, broker, state backend,
and argument cache) to use SQLite backends. This is primarily
for testing multiprocess scenarios on a single machine.
Note: SQLite will not work in distributed environments with independent DB files.
:param str | None sqlite_db_path: Path to the SQLite database file. If None, uses the default location.
:return: The builder instance for method chaining
"""
self._config.update(
{
"orchestrator_cls": "SQLiteOrchestrator",
"broker_cls": "SQLiteBroker",
"state_backend_cls": "SQLiteStateBackend",
"client_data_store_cls": "SQLiteClientDataStore",
"trigger_cls": "SQLiteTrigger",
}
)
self._using_memory_components = False
self._plugin_components.clear() # Clear any plugin components
if sqlite_db_path:
self._config["sqlite_db_path"] = sqlite_db_path
return self
[docs]
def mem_client_data_store(
self,
min_size_to_cache: int = 1024,
local_cache_size: int = 1024,
) -> "PynencBuilder":
"""Configure memory-based client data store.
This sets up in-memory argument caching for development and testing purposes.
Arguments larger than the specified threshold will be cached locally.
Also configures the ClientDataStore with the same settings.
:param int min_size_to_cache: Minimum string length required to cache an argument
:param int local_cache_size: Maximum number of items to cache locally
:return: The builder instance for method chaining
"""
self._config.update(
{
"client_data_store_cls": MemClientDataStore.__name__,
"min_size_to_cache": min_size_to_cache,
"local_cache_size": local_cache_size,
}
)
self._using_memory_components = True
return self
[docs]
def disable_client_data_store(self) -> "PynencBuilder":
"""Disable client data store completely.
This turns off all client data store functionality, which may be useful
for debugging or when caching is not desired.
:return: The builder instance for method chaining
"""
self._config["disable_client_data_store"] = True
return self
[docs]
def mem_trigger(
self,
scheduler_interval_seconds: int = 60,
enable_scheduler: bool = True,
) -> "PynencBuilder":
"""
Configure memory-based trigger system.
This sets up in-memory triggers for development and testing purposes.
Time-based triggers will be checked at the specified interval.
:param int scheduler_interval_seconds: Interval for the scheduler to check time-based triggers
:param bool enable_scheduler: Whether to enable the scheduler for time-based triggers
:return: The builder instance for method chaining
"""
self._config.update(
{
"trigger_cls": MemTrigger.__name__,
"scheduler_interval_seconds": scheduler_interval_seconds,
"enable_scheduler": enable_scheduler,
}
)
self._using_memory_components = True
return self
[docs]
def multi_thread_runner(
self,
min_threads: int = 1,
max_threads: int = 1,
enforce_max_processes: bool = False,
) -> "PynencBuilder":
"""
Configure the MultiThreadRunner for concurrent task execution.
The MultiThreadRunner uses threads to execute tasks concurrently within
the same process, providing efficient parallel execution with shared memory.
:param int min_threads: The minimum number of threads to keep in the thread pool
:param int max_threads: The maximum number of threads allowed in the thread pool
:param bool enforce_max_processes: If True, enforces the maximum number of processes
:return: The builder instance for method chaining
"""
self._config["runner_cls"] = "MultiThreadRunner"
self._config.update(
{
"min_threads": min_threads,
"max_threads": max_threads,
"enforce_max_processes": enforce_max_processes,
}
)
return self
[docs]
def persistent_process_runner(self, num_processes: int = 0) -> "PynencBuilder":
"""
Configure the PersistentProcessRunner for concurrent task execution.
The PersistentProcessRunner maintains a pool of persistent processes
for task execution, providing true parallel execution across multiple
CPU cores, with isolated memory spaces.
:param int num_processes: The number of processes to create in the process pool
:return: The builder instance for method chaining
"""
self._config["runner_cls"] = "PersistentProcessRunner"
self._config["num_processes"] = num_processes
return self
[docs]
def thread_runner(
self, min_threads: int = 1, max_threads: int = 0
) -> "PynencBuilder":
"""
Configure the ThreadRunner for task execution.
The ThreadRunner uses a thread pool to execute tasks concurrently within
the same process, providing efficient parallel execution with shared memory.
:param int min_threads: The minimum number of threads to keep in the thread pool
:param int max_threads: The maximum number of threads allowed in the thread pool
:return: The builder instance for method chaining
"""
self._config["runner_cls"] = "ThreadRunner"
self._config.update(
{
"min_threads": min_threads,
"max_threads": max_threads,
}
)
return self
[docs]
def process_runner(self) -> "PynencBuilder":
"""
Configure the ProcessRunner for task execution.
The ProcessRunner creates a new process for each task execution,
providing isolated execution context for each task.
:return: The builder instance for method chaining
"""
self._config["runner_cls"] = "ProcessRunner"
return self
[docs]
def dummy_runner(self) -> "PynencBuilder":
"""
Configure the DummyRunner for task execution.
The DummyRunner executes tasks in the main thread of the application.
This is useful for testing and debugging purposes.
:return: The builder instance for method chaining
"""
self._config["runner_cls"] = "DummyRunner"
return self
[docs]
def dev_mode(self, force_sync_tasks: bool = True) -> "PynencBuilder":
"""
Enable development mode for easier debugging.
In development mode, tasks can be forced to run synchronously,
making debugging and testing easier.
:param bool force_sync_tasks: If True, forces all tasks to run synchronously
:return: The builder instance for method chaining
"""
self._config["dev_mode_force_sync_tasks"] = force_sync_tasks
return self
[docs]
def logging(
self,
level: str | None = None,
*,
stream: str | None = None,
colors: bool | None = None,
fmt: str | None = None,
) -> "PynencBuilder":
"""
Configure all logging options in a single call.
Each parameter is optional — only non-``None`` values are applied,
so you can mix this with the individual ``logging_*`` methods freely.
:param str | None level: Log level — ``"debug"``, ``"info"``, ``"warning"``, ``"error"``, ``"critical"``
:param str | None stream: Output stream — ``"stdout"`` or ``"stderr"``
:param bool | None colors: ``True`` force, ``False`` disable, ``None`` auto-detect
:param str | None fmt: Log format — ``"text"`` or ``"json"``
:return: The builder instance for method chaining
:raises ValueError: If any provided value is invalid
"""
if level is not None:
self.logging_level(level)
if stream is not None:
self.logging_stream(stream)
if colors is not None:
self.logging_colors(colors)
if fmt is not None:
self.logging_format(fmt)
return self
[docs]
def logging_level(self, level: str) -> "PynencBuilder":
"""
Set the logging level for the application.
:param str level: Log level — ``"debug"``, ``"info"``, ``"warning"``, ``"error"``, ``"critical"``
:return: The builder instance for method chaining
:raises ValueError: If an invalid logging level is provided
"""
level = level.lower()
_VALID_LOG_LEVELS = ["debug", "info", "warning", "error", "critical"]
if level not in _VALID_LOG_LEVELS:
raise ValueError(
f"Invalid logging level: {level!r}. Valid options are: {', '.join(_VALID_LOG_LEVELS)}"
)
self._config["logging_level"] = level
return self
[docs]
def logging_colors(self, use_colors: bool | None = None) -> "PynencBuilder":
"""
Control ANSI colour output in logs.
Pass ``True`` to force colours, ``False`` to disable them, or ``None``
(the default) to let pynenc auto-detect based on whether the log stream
is an interactive TTY. Set to ``False`` when running in containers or
CI environments that do not support ANSI escape codes.
:param bool | None use_colors: True to force colours, False to disable, None for auto-detect
:return: The builder instance for method chaining
"""
self._config["log_use_colors"] = use_colors
return self
[docs]
def logging_stream(self, stream: str) -> "PynencBuilder":
"""
Set which output stream log messages are written to.
Use ``"stdout"`` when deploying in containers (e.g. Kubernetes / GKE)
so that log lines are not misclassified as errors by the container
runtime's default stderr-to-ERROR mapping.
:param str stream: Output stream — ``"stdout"`` or ``"stderr"``
:return: The builder instance for method chaining
:raises ValueError: If an invalid stream name is provided
"""
valid = ("stdout", "stderr")
if stream not in valid:
raise ValueError(
f"Invalid log stream: {stream!r}. Valid options are: {', '.join(valid)}"
)
self._config["log_stream"] = stream
return self
[docs]
def runner_tuning(
self,
runner_loop_sleep_time_sec: float = 0.01,
invocation_wait_results_sleep_time_sec: float = 0.01,
min_parallel_slots: int = 1,
) -> "PynencBuilder":
"""
Configure runner performance tuning parameters.
:param float runner_loop_sleep_time_sec: Sleep time between runner loop iterations
:param float invocation_wait_results_sleep_time_sec: Sleep time when waiting for results
:param int min_parallel_slots: Minimum number of parallel execution slots
:return: The builder instance for method chaining
"""
self._config.update(
{
"runner_loop_sleep_time_sec": runner_loop_sleep_time_sec,
"invocation_wait_results_sleep_time_sec": invocation_wait_results_sleep_time_sec,
"min_parallel_slots": min_parallel_slots,
}
)
return self
[docs]
def task_control(
self,
blocking_control: bool = False,
queue_timeout_sec: float = 0.1,
) -> "PynencBuilder":
"""
Configure task control parameters.
:param bool blocking_control: Whether to enable blocking control for concurrent tasks
:param float queue_timeout_sec: Timeout for queue operations in seconds
:return: The builder instance for method chaining
"""
self._config.update(
{
"blocking_control": blocking_control,
"queue_timeout_sec": queue_timeout_sec,
}
)
return self
[docs]
def serializer_json_pickle(self) -> "PynencBuilder":
"""
Configure the JSON-Pickle hybrid serializer
:return: The builder instance for method chaining
"""
self._config["serializer_cls"] = JsonPickleSerializer.__name__
return self
[docs]
def serializer_json(self) -> "PynencBuilder":
"""
Configure the JSON serializer
:return: The builder instance for method chaining
"""
self._config["serializer_cls"] = JsonSerializer.__name__
return self
[docs]
def serializer_pickle(self) -> "PynencBuilder":
"""
Configure the Pickle serializer
:return: The builder instance for method chaining
"""
self._config["serializer_cls"] = PickleSerializer.__name__
return self
[docs]
def concurrency_control(
self,
running_concurrency: str | ConcurrencyControlType | None = None,
registration_concurrency: str | ConcurrencyControlType | None = None,
) -> "PynencBuilder":
"""
Configure concurrency control default behaviors for all tasks.
:param Optional[Union[str, ConcurrencyControlType]] running_concurrency: Controls runtime concurrency behavior
:param Optional[Union[str, ConcurrencyControlType]] registration_concurrency: Controls registration concurrency behavior
:return: The builder instance for method chaining
"""
if running_concurrency is not None:
if isinstance(running_concurrency, ConcurrencyControlType):
self._config["running_concurrency"] = running_concurrency
else:
mode_upper = running_concurrency.upper()
self._config["running_concurrency"] = ConcurrencyControlType[mode_upper]
if registration_concurrency is not None:
if isinstance(registration_concurrency, ConcurrencyControlType):
self._config["registration_concurrency"] = registration_concurrency
else:
mode_upper = registration_concurrency.upper()
self._config["registration_concurrency"] = ConcurrencyControlType[
mode_upper
]
return self
[docs]
def max_pending_seconds(self, seconds: float) -> "PynencBuilder":
"""
Set the maximum time a task can remain in PENDING state.
:param float seconds: Maximum time in seconds a task can remain in PENDING state
:return: The builder instance for method chaining
"""
self._config["max_pending_seconds"] = seconds
return self
[docs]
def argument_print_mode(
self, mode: str | ArgumentPrintMode, truncate_length: int = 32
) -> "PynencBuilder":
"""
Configure how task arguments are printed in logs.
:param Union[str, ArgumentPrintMode] mode: The print mode to use
:param int truncate_length: Maximum length for printed argument values in TRUNCATED mode
:return: The builder instance for method chaining
:raises ValueError: If an invalid mode string is provided
"""
if isinstance(mode, ArgumentPrintMode):
arg_print_mode = mode
else:
mode_upper = mode.upper()
try:
arg_print_mode = ArgumentPrintMode[mode_upper]
except KeyError as ex:
valid_modes = [m.name for m in ArgumentPrintMode]
raise ValueError(
f"Invalid argument print mode: {mode}. Valid options are: {', '.join(valid_modes)}"
) from ex
self._config["argument_print_mode"] = arg_print_mode
# Configure related settings based on the mode
if arg_print_mode == ArgumentPrintMode.HIDDEN:
self._config["print_arguments"] = False
else:
self._config["print_arguments"] = True
if arg_print_mode == ArgumentPrintMode.TRUNCATED:
if truncate_length <= 0:
raise ValueError("truncate_length must be greater than 0")
self._config["truncate_arguments_length"] = truncate_length
return self
[docs]
def hide_arguments(self) -> "PynencBuilder":
"""
Configure logs to hide all task arguments.
:return: The builder instance for method chaining
"""
return self.argument_print_mode(ArgumentPrintMode.HIDDEN)
[docs]
def show_argument_keys(self) -> "PynencBuilder":
"""
Configure logs to show only argument names.
:return: The builder instance for method chaining
"""
return self.argument_print_mode(ArgumentPrintMode.KEYS)
[docs]
def show_full_arguments(self) -> "PynencBuilder":
"""
Configure logs to show complete argument values without truncation.
:return: The builder instance for method chaining
"""
return self.argument_print_mode(ArgumentPrintMode.FULL)
[docs]
def show_truncated_arguments(self, truncate_length: int = 32) -> "PynencBuilder":
"""
Configure logs to show truncated argument values.
:param int truncate_length: Maximum length for printed argument values
:return: The builder instance for method chaining
"""
return self.argument_print_mode(ArgumentPrintMode.TRUNCATED, truncate_length)
[docs]
def custom_config(self, **kwargs: Any) -> "PynencBuilder":
"""
Add arbitrary configuration values.
For common configuration values, prefer using the dedicated methods
instead of this generic method.
:param Any kwargs: Custom configuration values to add
:return: The builder instance for method chaining
"""
self._config.update(kwargs)
return self
[docs]
def trigger_task_modules(self, modules: Iterable[str]) -> "PynencBuilder":
"""
Declare modules that contain trigger-dependent tasks.
Runners will import these modules at startup to ensure tasks that
depend on triggers (cron, event, status, etc.) are registered. This
avoids importing all task modules during normal application import
time and prevents eager connections to external systems.
:param Iterable[str] modules: Iterable of module import paths (e.g. ["myapp.tasks.scheduled"]).
:return: The builder instance for method chaining
This method accepts any iterable of strings and stores the value as a
deduplicated `set[str]` in the configuration. Using a `set` ensures
uniqueness and avoids ordering assumptions.
"""
self._config["trigger_task_modules"] = set(modules)
return self
[docs]
def _validate_plugin_compatibility(self) -> None:
"""
Validate plugin configurations before building.
Runs all registered plugin validators to ensure the configuration is valid.
"""
for validator in self._plugin_validators:
try:
validator(self._config)
except Exception as e:
raise ValueError(f"Plugin configuration validation failed: {e}") from e
[docs]
def _validate_memory_compatibility(self) -> None:
"""
Validate that the selected runner is compatible with memory components.
:raises ValueError: If memory components are used with an incompatible runner
"""
if not self._using_memory_components:
return
runner_cls = self._config.get("runner_cls")
if not runner_cls:
return
memory_compatible_runners = ["DummyRunner", "ThreadRunner"]
if runner_cls not in memory_compatible_runners:
raise ValueError(
f"Runner '{runner_cls}' is not compatible with in-memory components. "
f"Use one of these runners instead: {', '.join(memory_compatible_runners)} "
f"or configure distributed components using an appropriate plugin."
)
[docs]
def build(self) -> "Pynenc":
"""
Build and return a configured Pynenc instance.
This method creates a new Pynenc instance using the configuration
values that have been set through the builder methods.
:return: A configured Pynenc instance ready for use
:raises ValueError: If the configuration is invalid
"""
from pynenc import Pynenc
self._validate_memory_compatibility()
self._validate_plugin_compatibility()
return Pynenc(config_values=self._config)