pynenc.conf.config_task

Module Contents

Classes

ConcurrencyControlType

Type of concurrency control.

TaskOptionsJSONEncoder

ConfigTask

Provides task-specific configuration settings for the distributed task system.

Functions

Data

API

class pynenc.conf.config_task.ConcurrencyControlType[source]

Bases: enum.StrEnum

Type of concurrency control.

Variables:
  • DISABLED – Concurrency control is disabled. This means there are no concurrency checks.

  • TASK – Concurrency is checked per task. Only one instance of each task can be in the determined state at a time.

  • ARGUMENTS – Concurrency is checked for each task’s arguments. Only one task with the same arguments can be in the determined state at a time.

  • KEYS – Concurrency is checked for each task’s key arguments. Only one task with the same key arguments can be in the determined state at a time.

Initialization

Initialize self. See help(type(self)) for accurate signature.

DISABLED

‘auto(…)’

TASK

‘auto(…)’

ARGUMENTS

‘auto(…)’

KEYS

‘auto(…)’

pynenc.conf.config_task.DEFAULT_KEY_ARGS: cistell.ConfigField[tuple[str, ...]]

‘ConfigField(…)’

class pynenc.conf.config_task.TaskOptionsJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.JSONEncoder

default(obj: Any) Any[source]
pynenc.conf.config_task.options_deserializer(serialized_pairs: list[tuple[Any, Any]]) dict[str, Any][source]
pynenc.conf.config_task.exception_mapper(value: list[str] | list[type[Exception]]) tuple[type[Exception], ...][source]
pynenc.conf.config_task.T

‘TypeVar(…)’

pynenc.conf.config_task.exception_config_mapper(value: list[str], expected_type: type[pynenc.conf.config_task.T]) pynenc.conf.config_task.T[source]
class pynenc.conf.config_task.ConfigTask(task_id: str, config_values: Optional[dict[str, Any]] = None, config_filepath: Optional[str] = None, task_options: Optional[dict[str, Any]] = None)[source]

Bases: pynenc.conf.config_base.ConfigPynencBase

Provides task-specific configuration settings for the distributed task system.

This subclass of ConfigPynencBase adds task-level configuration options, allowing for fine-grained control over the behavior of individual tasks. Configuration can be specified globally for all tasks, or individually for each task using environment variables, configuration files, or the @task decorator.

Variables:
  • parallel_batch_size (ConfigField[int]) – If set to 100, when parallelizing a task, we will route batches of 100 tasks. 0 means that each parallel task will be routed individually.

  • retry_for (ConfigField[tuple]) – A tuple of exceptions for which the task should be retried.

  • max_retries (ConfigField[int]) – Defines the maximum number of retries for a task. This limit ensures that a task does not retry indefinitely.

  • running_concurrency (ConfigField[ConcurrencyControlType]) – Controls the concurrency behavior of the task. This option prevents the task from being in a running state, managing and limiting concurrent execution of the same task.

  • registration_concurrency (ConfigField[ConcurrencyControlType]) – Manages the registration concurrency for the task, ensuring unique task registration based on the configuration. Useful for tasks that should not execute multiple times in parallel or to avoid generating too much unnecessary tasks in the system.

  • key_arguments (ConfigField[str]) – Specifies key arguments for concurrency control, relevant when concurrency control is set to key-based. This option determines which arguments are used to identify unique task invocations.

  • on_diff_non_key_args_raise (ConfigField[bool]) – If set to True, raises an exception when a task invocation with matching key arguments but different non-key arguments is encountered. This option is used to handle concurrency at the key level.

  • call_result_cache (ConfigField[bool]) – If set to True, enables caching for the task. This option is useful for tasks that perform expensive computations and can benefit from caching results.

  • disable_cache_args (ConfigField[tuple[str, ]]) – Specifies arguments to exclude from caching. This option is useful for tasks that should not cache results based on certain arguments. It can be set to ("*",) to disable caching for all arguments.

Examples

Using environment variables to configure tasks:

… code-block:: python

# Set global auto parallel batch size
os.environ["PYNENC__CONFIGTASK__PARALLEL_BATCH_SIZE"] = "2"

# Set auto parallel batch size specifically for 'my_module.my_task'
os.environ["PYNENC__CONFIGTASK__MY_MODULE#MY_TASK__PARALLEL_BATCH_SIZE"] = "3"

Loading configuration from a YAML file:

… code-block:: yaml

task:
    parallel_batch_size: 4
    max_retries: 10
    module_name.task_name:
        max_retries: 5

… code-block:: python

# Create and load a config file for tasks
config = ConfigTask(task_id="my_module.my_task", config_filepath="path/to/config.yaml")

… note::

- When specifying task-specific settings using environment variables, the separator
  between the module name and task name is `#`, not `__`. For example, use
  `MY_MODULE#MY_TASK__AUTO_PARALLEL` to specify the task-specific setting.

The above examples demonstrate how to configure tasks both globally and on a per-task basis, offering flexibility and precise control over the behavior of tasks in the system.

Initialization

parallel_batch_size

‘ConfigField(…)’

retry_for

‘ConfigField(…)’

max_retries

‘ConfigField(…)’

running_concurrency

‘ConfigField(…)’

registration_concurrency

‘ConfigField(…)’

key_arguments

None

on_diff_non_key_args_raise

‘ConfigField(…)’

call_result_cache

‘ConfigField(…)’

disable_cache_args: cistell.ConfigField[tuple[str, ...]]

‘ConfigField(…)’

options_to_json() str[source]
Returns:

the serialized options

static options_from_json(options_json: str) dict[str, Any][source]
Returns:

a new options from a dictionary

init_config_value_key_from_mapping(source: str, config_id: str, key: str, mapping: dict, conf_mapping: dict) None[source]
init_config_value_from_env_vars(config_cls: type[pynenc.conf.config_base.ConfigPynencBase]) None[source]