Source code for pynenc.builder

"""
Builder pattern implementation for configuring Pynenc applications with plugin support.

This module provides a fluent, chainable builder interface that can be extended by plugins
through Python's entry points system. Plugins automatically register their builder methods
when installed, enabling seamless integration of backend-specific functionality.

Key components:
- PynencBuilder: Core builder class with plugin support
- Plugin registration system via entry points
- Dynamic method resolution for plugin-provided methods
- Validation system for plugin configurations
"""

import logging
import warnings
from collections.abc import Callable, Iterable
from typing import TYPE_CHECKING, Any

from pynenc.client_data_store.mem_client_data_store import MemClientDataStore
from pynenc.conf.config_pynenc import ArgumentPrintMode
from pynenc.conf.config_task import ConcurrencyControlType
from pynenc.serializer import JsonPickleSerializer, JsonSerializer, PickleSerializer
from pynenc.trigger.mem_trigger import MemTrigger

# Entry points imports
try:
    from importlib.metadata import entry_points
except ImportError:
    entry_points = None  # type: ignore[assignment]

try:
    import pkg_resources  # type: ignore[import-untyped]
except ImportError:
    pkg_resources = None  # type: ignore[assignment]

if TYPE_CHECKING:
    from pynenc import Pynenc


[docs] class PynencBuilder: """ A builder pattern implementation for creating and configuring Pynenc applications. This builder simplifies the configuration process by providing intuitive method chaining to set up various components of a Pynenc application. Backend-specific methods are provided by installed plugins through Python's entry points system. The plugin system automatically discovers and registers methods from installed plugins, enabling backend-specific configuration methods to become available when plugins are installed. :example: ```python # With backend plugins installed pynenc_app = ( PynencBuilder() .app_id("my_application") .serializer_pickle() .multi_thread_runner(min_threads=1, max_threads=4) .logging_level("info") .build() ) # Memory-based configuration for development pynenc_app = ( PynencBuilder() .app_id("my_application") .memory() # Built-in memory backend .mem_client_data_store(min_size_to_cache=1024) .build() ) ``` """ # Plugin registration system — intentionally class-level so all instances # share discovered plugins. Mutation happens only during class-level # _load_plugins (guarded by _plugins_loaded), never per-instance. _plugin_methods: dict[str, Callable] = {} _plugin_validators: list[Callable] = [] _plugins_loaded = False def __init__(self) -> None: """ Initialize a new PynencBuilder with plugin discovery and empty configuration. Automatically discovers and loads plugin methods via entry points on first instantiation. """ self._config: dict[str, Any] = {} self._using_memory_components = False self._plugin_components: set[str] = set() # Auto-discover plugins on first builder instantiation self._load_plugins()
[docs] @classmethod def _load_plugins(cls) -> None: """ Load and register plugin methods via Python entry points system. Discovers all plugins registered under 'pynenc.plugins' entry point group and registers their builder methods for dynamic resolution. """ if cls._plugins_loaded: return cls._plugins_loaded = True # Try to load plugins using available entry points mechanism if entry_points is not None: cls._load_plugins_importlib() elif pkg_resources is not None: cls._load_plugins_pkg_resources()
# If neither is available, continue without plugins
[docs] @classmethod def _load_plugins_importlib(cls) -> None: """Load plugins using importlib.metadata.""" if entry_points is None: return try: # Try Python 3.10+ API first try: plugin_entries = list(entry_points(group="pynenc.plugins")) except TypeError: # Fallback to older API eps = entry_points() if hasattr(eps, "select"): plugin_entries = list(eps.select(group="pynenc.plugins")) else: # For Python 3.8-3.9, entry_points() returns a dict-like object plugin_entries = list(eps.get("pynenc.plugins", [])) # type: ignore[attr-defined] for ep in plugin_entries: cls._register_plugin_from_entry_point(ep) except Exception: logging.getLogger(__name__).debug( "Failed to discover plugins via importlib entry points", exc_info=True )
[docs] @classmethod def _load_plugins_pkg_resources(cls) -> None: """Load plugins using pkg_resources.""" if pkg_resources is None: return try: for entry_point in pkg_resources.iter_entry_points("pynenc.plugins"): cls._register_plugin_from_entry_point(entry_point) except Exception: logging.getLogger(__name__).debug( "Failed to discover plugins via pkg_resources", exc_info=True )
[docs] @classmethod def _register_plugin_from_entry_point(cls, entry_point: Any) -> None: """Register a single plugin from an entry point.""" try: plugin_class = entry_point.load() # Plugins should have a register_builder_methods class method if hasattr(plugin_class, "register_builder_methods"): plugin_class.register_builder_methods(cls) except Exception as e: # Log plugin loading errors but don't fail the builder warnings.warn( f"Failed to load plugin {entry_point.name}: {e}", UserWarning, stacklevel=2, )
[docs] @classmethod def register_plugin_method(cls, method_name: str, method_func: Callable) -> None: """ Register a plugin method to be available on PynencBuilder instances. This allows plugins to extend the builder with their own configuration methods. Plugins should call this during their registration process. :param str method_name: Name of the method to register (e.g., 'redis', 'mongodb') :param Callable method_func: The method function that takes builder as first argument :raises ValueError: If method name conflicts with existing core methods """ # Check for conflicts with core methods (but allow plugin method overrides) if hasattr(cls, method_name) and method_name not in cls._plugin_methods: raise ValueError( f"Cannot register plugin method '{method_name}': conflicts with core method" ) cls._plugin_methods[method_name] = method_func
[docs] @classmethod def register_plugin_validator(cls, validator_func: Callable) -> None: """ Register a plugin validator function that validates configuration before build(). Validators should raise ValueError if the configuration is invalid. :param Callable validator_func: Function that takes (config: dict) and raises ValueError if invalid """ cls._plugin_validators.append(validator_func)
[docs] def __getattr__(self, name: str) -> Callable: """ Dynamic method resolution for plugin-registered methods. This enables plugins to add methods that are automatically available when the plugin is installed. :param str name: Method name being accessed :return: Bound method from plugin :raises AttributeError: If method not found in plugins with helpful error message """ if name in self._plugin_methods: # Bind the plugin method to this instance plugin_method = self._plugin_methods[name] return lambda *args, **kwargs: plugin_method(self, *args, **kwargs) available = ( ", ".join(sorted(self._plugin_methods)) if self._plugin_methods else "none" ) raise AttributeError( f"'{self.__class__.__name__}' has no attribute '{name}'. " f"Available plugin methods: [{available}]. " f"Install a backend plugin (pynenc-redis, pynenc-mongodb, pynenc-rabbitmq) to add more." )
[docs] def __dir__(self) -> list[str]: """Include plugin methods in dir() for discoverability.""" return sorted(set(super().__dir__()) | set(self._plugin_methods))
[docs] def app_id(self, app_id: str) -> "PynencBuilder": """ Set the application ID for the Pynenc application. The application ID uniquely identifies this Pynenc application instance and is used in logging, monitoring, and component configuration. :param str app_id: The unique identifier for this application :return: The builder instance for method chaining """ self._config["app_id"] = app_id return self
[docs] def memory(self) -> "PynencBuilder": """ Configure in-memory components for the Pynenc application. This sets up all components (orchestrator, broker, state backend, and argument cache) to use in-memory backends. This is primarily for testing and development purposes. Note: In-memory components are only compatible with certain runners. :return: The builder instance for method chaining """ self._config.update( { "orchestrator_cls": "MemOrchestrator", "broker_cls": "MemBroker", "state_backend_cls": "MemStateBackend", "client_data_store_cls": MemClientDataStore.__name__, "trigger_cls": MemTrigger.__name__, } ) self._using_memory_components = True self._plugin_components.clear() # Clear any plugin components return self
[docs] def sqlite(self, sqlite_db_path: str | None = None) -> "PynencBuilder": """ Configure SQLite components for the Pynenc application. This sets up all components (orchestrator, broker, state backend, and argument cache) to use SQLite backends. This is primarily for testing multiprocess scenarios on a single machine. Note: SQLite will not work in distributed environments with independent DB files. :param str | None sqlite_db_path: Path to the SQLite database file. If None, uses the default location. :return: The builder instance for method chaining """ self._config.update( { "orchestrator_cls": "SQLiteOrchestrator", "broker_cls": "SQLiteBroker", "state_backend_cls": "SQLiteStateBackend", "client_data_store_cls": "SQLiteClientDataStore", "trigger_cls": "SQLiteTrigger", } ) self._using_memory_components = False self._plugin_components.clear() # Clear any plugin components if sqlite_db_path: self._config["sqlite_db_path"] = sqlite_db_path return self
[docs] def mem_client_data_store( self, min_size_to_cache: int = 1024, local_cache_size: int = 1024, ) -> "PynencBuilder": """Configure memory-based client data store. This sets up in-memory argument caching for development and testing purposes. Arguments larger than the specified threshold will be cached locally. Also configures the ClientDataStore with the same settings. :param int min_size_to_cache: Minimum string length required to cache an argument :param int local_cache_size: Maximum number of items to cache locally :return: The builder instance for method chaining """ self._config.update( { "client_data_store_cls": MemClientDataStore.__name__, "min_size_to_cache": min_size_to_cache, "local_cache_size": local_cache_size, } ) self._using_memory_components = True return self
[docs] def disable_client_data_store(self) -> "PynencBuilder": """Disable client data store completely. This turns off all client data store functionality, which may be useful for debugging or when caching is not desired. :return: The builder instance for method chaining """ self._config["disable_client_data_store"] = True return self
[docs] def mem_trigger( self, scheduler_interval_seconds: int = 60, enable_scheduler: bool = True, ) -> "PynencBuilder": """ Configure memory-based trigger system. This sets up in-memory triggers for development and testing purposes. Time-based triggers will be checked at the specified interval. :param int scheduler_interval_seconds: Interval for the scheduler to check time-based triggers :param bool enable_scheduler: Whether to enable the scheduler for time-based triggers :return: The builder instance for method chaining """ self._config.update( { "trigger_cls": MemTrigger.__name__, "scheduler_interval_seconds": scheduler_interval_seconds, "enable_scheduler": enable_scheduler, } ) self._using_memory_components = True return self
[docs] def multi_thread_runner( self, min_threads: int = 1, max_threads: int = 1, enforce_max_processes: bool = False, ) -> "PynencBuilder": """ Configure the MultiThreadRunner for concurrent task execution. The MultiThreadRunner uses threads to execute tasks concurrently within the same process, providing efficient parallel execution with shared memory. :param int min_threads: The minimum number of threads to keep in the thread pool :param int max_threads: The maximum number of threads allowed in the thread pool :param bool enforce_max_processes: If True, enforces the maximum number of processes :return: The builder instance for method chaining """ self._config["runner_cls"] = "MultiThreadRunner" self._config.update( { "min_threads": min_threads, "max_threads": max_threads, "enforce_max_processes": enforce_max_processes, } ) return self
[docs] def persistent_process_runner(self, num_processes: int = 0) -> "PynencBuilder": """ Configure the PersistentProcessRunner for concurrent task execution. The PersistentProcessRunner maintains a pool of persistent processes for task execution, providing true parallel execution across multiple CPU cores, with isolated memory spaces. :param int num_processes: The number of processes to create in the process pool :return: The builder instance for method chaining """ self._config["runner_cls"] = "PersistentProcessRunner" self._config["num_processes"] = num_processes return self
[docs] def thread_runner( self, min_threads: int = 1, max_threads: int = 0 ) -> "PynencBuilder": """ Configure the ThreadRunner for task execution. The ThreadRunner uses a thread pool to execute tasks concurrently within the same process, providing efficient parallel execution with shared memory. :param int min_threads: The minimum number of threads to keep in the thread pool :param int max_threads: The maximum number of threads allowed in the thread pool :return: The builder instance for method chaining """ self._config["runner_cls"] = "ThreadRunner" self._config.update( { "min_threads": min_threads, "max_threads": max_threads, } ) return self
[docs] def process_runner(self) -> "PynencBuilder": """ Configure the ProcessRunner for task execution. The ProcessRunner creates a new process for each task execution, providing isolated execution context for each task. :return: The builder instance for method chaining """ self._config["runner_cls"] = "ProcessRunner" return self
[docs] def dummy_runner(self) -> "PynencBuilder": """ Configure the DummyRunner for task execution. The DummyRunner executes tasks in the main thread of the application. This is useful for testing and debugging purposes. :return: The builder instance for method chaining """ self._config["runner_cls"] = "DummyRunner" return self
[docs] def dev_mode(self, force_sync_tasks: bool = True) -> "PynencBuilder": """ Enable development mode for easier debugging. In development mode, tasks can be forced to run synchronously, making debugging and testing easier. :param bool force_sync_tasks: If True, forces all tasks to run synchronously :return: The builder instance for method chaining """ self._config["dev_mode_force_sync_tasks"] = force_sync_tasks return self
[docs] def logging( self, level: str | None = None, *, stream: str | None = None, colors: bool | None = None, fmt: str | None = None, ) -> "PynencBuilder": """ Configure all logging options in a single call. Each parameter is optional — only non-``None`` values are applied, so you can mix this with the individual ``logging_*`` methods freely. :param str | None level: Log level — ``"debug"``, ``"info"``, ``"warning"``, ``"error"``, ``"critical"`` :param str | None stream: Output stream — ``"stdout"`` or ``"stderr"`` :param bool | None colors: ``True`` force, ``False`` disable, ``None`` auto-detect :param str | None fmt: Log format — ``"text"`` or ``"json"`` :return: The builder instance for method chaining :raises ValueError: If any provided value is invalid """ if level is not None: self.logging_level(level) if stream is not None: self.logging_stream(stream) if colors is not None: self.logging_colors(colors) if fmt is not None: self.logging_format(fmt) return self
[docs] def logging_level(self, level: str) -> "PynencBuilder": """ Set the logging level for the application. :param str level: Log level — ``"debug"``, ``"info"``, ``"warning"``, ``"error"``, ``"critical"`` :return: The builder instance for method chaining :raises ValueError: If an invalid logging level is provided """ level = level.lower() _VALID_LOG_LEVELS = ["debug", "info", "warning", "error", "critical"] if level not in _VALID_LOG_LEVELS: raise ValueError( f"Invalid logging level: {level!r}. Valid options are: {', '.join(_VALID_LOG_LEVELS)}" ) self._config["logging_level"] = level return self
[docs] def logging_colors(self, use_colors: bool | None = None) -> "PynencBuilder": """ Control ANSI colour output in logs. Pass ``True`` to force colours, ``False`` to disable them, or ``None`` (the default) to let pynenc auto-detect based on whether the log stream is an interactive TTY. Set to ``False`` when running in containers or CI environments that do not support ANSI escape codes. :param bool | None use_colors: True to force colours, False to disable, None for auto-detect :return: The builder instance for method chaining """ self._config["log_use_colors"] = use_colors return self
[docs] def logging_stream(self, stream: str) -> "PynencBuilder": """ Set which output stream log messages are written to. Use ``"stdout"`` when deploying in containers (e.g. Kubernetes / GKE) so that log lines are not misclassified as errors by the container runtime's default stderr-to-ERROR mapping. :param str stream: Output stream — ``"stdout"`` or ``"stderr"`` :return: The builder instance for method chaining :raises ValueError: If an invalid stream name is provided """ valid = ("stdout", "stderr") if stream not in valid: raise ValueError( f"Invalid log stream: {stream!r}. Valid options are: {', '.join(valid)}" ) self._config["log_stream"] = stream return self
[docs] def logging_format(self, fmt: str) -> "PynencBuilder": """ Set the log output format. ``"text"`` (default) emits human-readable lines compatible with pynmon Log Explorer. ``"json"`` emits structured JSON objects — ideal for log-aggregation pipelines such as GCP Cloud Logging, Datadog, or the ELK stack. The ``text`` field in the JSON payload preserves the human-readable representation so pynmon can still parse JSON logs. :param str fmt: Log format — ``"text"`` or ``"json"`` :return: The builder instance for method chaining :raises ValueError: If an invalid format name is provided """ from pynenc.conf.config_pynenc import LogFormat fmt_lower = fmt.lower() try: self._config["log_format"] = LogFormat(fmt_lower) except ValueError as ex: valid = [f.value for f in LogFormat] raise ValueError( f"Invalid log format: {fmt!r}. Valid options are: {', '.join(valid)}" ) from ex return self
[docs] def runner_tuning( self, runner_loop_sleep_time_sec: float = 0.01, invocation_wait_results_sleep_time_sec: float = 0.01, min_parallel_slots: int = 1, ) -> "PynencBuilder": """ Configure runner performance tuning parameters. :param float runner_loop_sleep_time_sec: Sleep time between runner loop iterations :param float invocation_wait_results_sleep_time_sec: Sleep time when waiting for results :param int min_parallel_slots: Minimum number of parallel execution slots :return: The builder instance for method chaining """ self._config.update( { "runner_loop_sleep_time_sec": runner_loop_sleep_time_sec, "invocation_wait_results_sleep_time_sec": invocation_wait_results_sleep_time_sec, "min_parallel_slots": min_parallel_slots, } ) return self
[docs] def task_control( self, blocking_control: bool = False, queue_timeout_sec: float = 0.1, ) -> "PynencBuilder": """ Configure task control parameters. :param bool blocking_control: Whether to enable blocking control for concurrent tasks :param float queue_timeout_sec: Timeout for queue operations in seconds :return: The builder instance for method chaining """ self._config.update( { "blocking_control": blocking_control, "queue_timeout_sec": queue_timeout_sec, } ) return self
[docs] def serializer_json_pickle(self) -> "PynencBuilder": """ Configure the JSON-Pickle hybrid serializer :return: The builder instance for method chaining """ self._config["serializer_cls"] = JsonPickleSerializer.__name__ return self
[docs] def serializer_json(self) -> "PynencBuilder": """ Configure the JSON serializer :return: The builder instance for method chaining """ self._config["serializer_cls"] = JsonSerializer.__name__ return self
[docs] def serializer_pickle(self) -> "PynencBuilder": """ Configure the Pickle serializer :return: The builder instance for method chaining """ self._config["serializer_cls"] = PickleSerializer.__name__ return self
[docs] def concurrency_control( self, running_concurrency: str | ConcurrencyControlType | None = None, registration_concurrency: str | ConcurrencyControlType | None = None, ) -> "PynencBuilder": """ Configure concurrency control default behaviors for all tasks. :param Optional[Union[str, ConcurrencyControlType]] running_concurrency: Controls runtime concurrency behavior :param Optional[Union[str, ConcurrencyControlType]] registration_concurrency: Controls registration concurrency behavior :return: The builder instance for method chaining """ if running_concurrency is not None: if isinstance(running_concurrency, ConcurrencyControlType): self._config["running_concurrency"] = running_concurrency else: mode_upper = running_concurrency.upper() self._config["running_concurrency"] = ConcurrencyControlType[mode_upper] if registration_concurrency is not None: if isinstance(registration_concurrency, ConcurrencyControlType): self._config["registration_concurrency"] = registration_concurrency else: mode_upper = registration_concurrency.upper() self._config["registration_concurrency"] = ConcurrencyControlType[ mode_upper ] return self
[docs] def max_pending_seconds(self, seconds: float) -> "PynencBuilder": """ Set the maximum time a task can remain in PENDING state. :param float seconds: Maximum time in seconds a task can remain in PENDING state :return: The builder instance for method chaining """ self._config["max_pending_seconds"] = seconds return self
[docs] def argument_print_mode( self, mode: str | ArgumentPrintMode, truncate_length: int = 32 ) -> "PynencBuilder": """ Configure how task arguments are printed in logs. :param Union[str, ArgumentPrintMode] mode: The print mode to use :param int truncate_length: Maximum length for printed argument values in TRUNCATED mode :return: The builder instance for method chaining :raises ValueError: If an invalid mode string is provided """ if isinstance(mode, ArgumentPrintMode): arg_print_mode = mode else: mode_upper = mode.upper() try: arg_print_mode = ArgumentPrintMode[mode_upper] except KeyError as ex: valid_modes = [m.name for m in ArgumentPrintMode] raise ValueError( f"Invalid argument print mode: {mode}. Valid options are: {', '.join(valid_modes)}" ) from ex self._config["argument_print_mode"] = arg_print_mode # Configure related settings based on the mode if arg_print_mode == ArgumentPrintMode.HIDDEN: self._config["print_arguments"] = False else: self._config["print_arguments"] = True if arg_print_mode == ArgumentPrintMode.TRUNCATED: if truncate_length <= 0: raise ValueError("truncate_length must be greater than 0") self._config["truncate_arguments_length"] = truncate_length return self
[docs] def hide_arguments(self) -> "PynencBuilder": """ Configure logs to hide all task arguments. :return: The builder instance for method chaining """ return self.argument_print_mode(ArgumentPrintMode.HIDDEN)
[docs] def show_argument_keys(self) -> "PynencBuilder": """ Configure logs to show only argument names. :return: The builder instance for method chaining """ return self.argument_print_mode(ArgumentPrintMode.KEYS)
[docs] def show_full_arguments(self) -> "PynencBuilder": """ Configure logs to show complete argument values without truncation. :return: The builder instance for method chaining """ return self.argument_print_mode(ArgumentPrintMode.FULL)
[docs] def show_truncated_arguments(self, truncate_length: int = 32) -> "PynencBuilder": """ Configure logs to show truncated argument values. :param int truncate_length: Maximum length for printed argument values :return: The builder instance for method chaining """ return self.argument_print_mode(ArgumentPrintMode.TRUNCATED, truncate_length)
[docs] def custom_config(self, **kwargs: Any) -> "PynencBuilder": """ Add arbitrary configuration values. For common configuration values, prefer using the dedicated methods instead of this generic method. :param Any kwargs: Custom configuration values to add :return: The builder instance for method chaining """ self._config.update(kwargs) return self
[docs] def trigger_task_modules(self, modules: Iterable[str]) -> "PynencBuilder": """ Declare modules that contain trigger-dependent tasks. Runners will import these modules at startup to ensure tasks that depend on triggers (cron, event, status, etc.) are registered. This avoids importing all task modules during normal application import time and prevents eager connections to external systems. :param Iterable[str] modules: Iterable of module import paths (e.g. ["myapp.tasks.scheduled"]). :return: The builder instance for method chaining This method accepts any iterable of strings and stores the value as a deduplicated `set[str]` in the configuration. Using a `set` ensures uniqueness and avoids ordering assumptions. """ self._config["trigger_task_modules"] = set(modules) return self
[docs] def _validate_plugin_compatibility(self) -> None: """ Validate plugin configurations before building. Runs all registered plugin validators to ensure the configuration is valid. """ for validator in self._plugin_validators: try: validator(self._config) except Exception as e: raise ValueError(f"Plugin configuration validation failed: {e}") from e
[docs] def _validate_memory_compatibility(self) -> None: """ Validate that the selected runner is compatible with memory components. :raises ValueError: If memory components are used with an incompatible runner """ if not self._using_memory_components: return runner_cls = self._config.get("runner_cls") if not runner_cls: return memory_compatible_runners = ["DummyRunner", "ThreadRunner"] if runner_cls not in memory_compatible_runners: raise ValueError( f"Runner '{runner_cls}' is not compatible with in-memory components. " f"Use one of these runners instead: {', '.join(memory_compatible_runners)} " f"or configure distributed components using an appropriate plugin." )
[docs] def build(self) -> "Pynenc": """ Build and return a configured Pynenc instance. This method creates a new Pynenc instance using the configuration values that have been set through the builder methods. :return: A configured Pynenc instance ready for use :raises ValueError: If the configuration is invalid """ from pynenc import Pynenc self._validate_memory_compatibility() self._validate_plugin_compatibility() return Pynenc(config_values=self._config)