Source code for pynenc.util.log
"""Logging utilities for Pynenc.
Context-aware logging with three output formats:
- **ColoredFormatter**: ANSI-colored terminal output with context prefixes
- **PlainTextFormatter**: Plain text for non-TTY / log files
- **JsonFormatter**: Structured JSON for containers and cloud log aggregators
Timestamps include the local timezone offset (e.g. ``2026-03-05 07:56:07.476+01:00``).
Key components:
- PynencContextFilter: Injects runner/invocation context into log records
- _PynencFormatterBase: Shared TZ-aware timestamps and context prefix logic
- create_logger: Factory that wires formatter, stream, and filter from config
"""
import json
import logging
import sys
import time as _time
from typing import TYPE_CHECKING, Any
from pynenc import context
from pynenc.conf.config_pynenc import LogFormat
if TYPE_CHECKING:
from logging import LogRecord
from pynenc.app import Pynenc
from pynenc.conf.config_pynenc import ConfigPynenc
from pynenc.runner.runner_context import RunnerContext
# Truncation length for IDs
_TRUNCATE_LENGTH = 8
# Define ANSI color codes
[docs]
class Colors:
"""ANSI color codes for terminal coloring."""
RESET = "\033[0m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
CYAN = "\033[36m"
RED_BG = "\033[41m"
WHITE = "\033[37m"
[docs]
class PynencContextFilter(logging.Filter):
"""Injects Pynenc execution context into every log record."""
def __init__(self, app_id: str, conf: "ConfigPynenc") -> None:
"""
:param str app_id: The application identifier.
:param ConfigPynenc conf: App configuration (read dynamically).
"""
super().__init__()
self.app_id = app_id
self.conf = conf
[docs]
def filter(self, record: "LogRecord") -> bool:
"""Populate record with runner_ctx, invocation_id, task_id, compact_log_context."""
record.runner_ctx = context.get_current_runner_context(self.app_id)
if invocation := context.get_dist_invocation_context(self.app_id):
record.invocation_id = invocation.invocation_id
record.task_id = invocation.task.task_id.key
record.compact_log_context = self.conf.compact_log_context
return True
[docs]
class _PynencFormatterBase(logging.Formatter):
"""Base formatter: TZ-aware timestamps, context prefix building, ID truncation."""
[docs]
def formatTime(self, record: "LogRecord", datefmt: str | None = None) -> str:
"""Produce ``YYYY-MM-DD HH:MM:SS.mmm±HH:MM`` timestamp."""
ct = self.converter(record.created)
base = _time.strftime("%Y-%m-%d %H:%M:%S", ct)
return f"{base}.{int(record.msecs):03d}{_format_tz_offset(ct)}"
[docs]
def _build_context_prefix(self, record: "LogRecord") -> str:
"""Build ``CLS(id)UUID:task.key`` prefix from record context attributes."""
runner_ctx = getattr(record, "runner_ctx", None)
task_id = getattr(record, "task_id", None)
invocation_id = getattr(record, "invocation_id", None)
truncate_ids = getattr(record, "compact_log_context", True)
prefix = ""
if context_display := self._format_runner_ctx_display(runner_ctx, truncate_ids):
prefix = context_display
if invocation_id:
prefix += invocation_id
if task_id:
if invocation_id:
prefix += ":"
prefix += task_id
return prefix
[docs]
def _build_context_dict(self, record: "LogRecord") -> dict[str, Any]:
"""Extract untruncated context fields as a dict for structured output."""
runner_ctx: RunnerContext | None = getattr(record, "runner_ctx", None)
invocation_id: str | None = getattr(record, "invocation_id", None)
task_id: str | None = getattr(record, "task_id", None)
result: dict[str, Any] = {}
if runner_ctx is not None:
result["runner_class"] = runner_ctx.runner_cls
result["runner_id"] = runner_ctx.runner_id
if runner_ctx.parent_ctx is not None:
result["parent_runner_class"] = runner_ctx.parent_ctx.runner_cls
result["parent_runner_id"] = runner_ctx.parent_ctx.runner_id
if invocation_id is not None:
result["invocation_id"] = invocation_id
if task_id is not None:
result["task_id"] = task_id
return result
[docs]
def _format_runner_ctx_display(
self,
runner_ctx: "RunnerContext | None",
compact_context: bool,
) -> str | None:
"""Format runner context as ``CLS(id)`` or ``Parent(id).Child(id)``."""
if runner_ctx is None:
return None
curr_cls = self._maybe_compact_class_name(
runner_ctx.runner_cls, compact_context
)
if "External" in runner_ctx.runner_cls:
# Don't truncate external runner ID (hostname+PID)
curr_id = runner_ctx.runner_id
else:
curr_id = self._maybe_truncate(runner_ctx.runner_id, compact_context)
if runner_ctx.parent_ctx is not None:
# Two levels: parent -> current
parent = runner_ctx.parent_ctx
parent_cls = self._maybe_compact_class_name(
parent.runner_cls, compact_context
)
parent_id = self._maybe_truncate(parent.runner_id, compact_context)
return f"{parent_cls}({parent_id}).{curr_cls}({curr_id})"
# Just current context
return f"{curr_cls}({curr_id})"
[docs]
def _maybe_truncate(self, value: str, truncate: bool) -> str:
"""Truncate to ``_TRUNCATE_LENGTH`` chars when *truncate* is True."""
if truncate and len(value) > _TRUNCATE_LENGTH:
return value[:_TRUNCATE_LENGTH]
return value
[docs]
def _maybe_compact_class_name(self, class_name: str, compact: bool) -> str:
"""Abbreviate to capital letters (e.g. PersistentProcessRunner → PPR)."""
if not compact:
return class_name
return "".join(char for char in class_name if char.isupper())
[docs]
class ColoredFormatter(_PynencFormatterBase):
"""ANSI-colored formatter with context prefixes for interactive terminals."""
LEVEL_COLORS = {
"DEBUG": Colors.CYAN,
"INFO": Colors.GREEN,
"WARNING": Colors.YELLOW,
"ERROR": Colors.RED,
"CRITICAL": f"{Colors.WHITE}{Colors.RED_BG}",
}
[docs]
def format(self, record: "LogRecord") -> str:
"""Format with ANSI colors and ``[context]`` prefix."""
levelname, name, msg = record.levelname, record.name, record.msg
prefix = self._build_context_prefix(record)
if color := self.LEVEL_COLORS.get(levelname):
record.levelname = f"{color}{levelname:<8}{Colors.RESET}"
record.name = f"{Colors.BLUE}{name}{Colors.RESET}"
record.msg = (
f"{color}[{prefix}]{Colors.RESET} {color}{msg}{Colors.RESET}"
if prefix
else f"{color}{msg}{Colors.RESET}"
)
result = super().format(record)
record.levelname, record.name, record.msg = levelname, name, msg
return result
[docs]
class PlainTextFormatter(_PynencFormatterBase):
"""Plain text formatter with context prefixes, compatible with pynmon log explorer."""
[docs]
def format(self, record: "LogRecord") -> str:
"""Format with ``[context]`` prefix, no ANSI codes."""
msg = record.msg
if prefix := self._build_context_prefix(record):
record.msg = f"[{prefix}] {msg}"
result = super().format(record)
record.msg = msg
return result
[docs]
class JsonFormatter(_PynencFormatterBase):
"""Structured JSON formatter for containers and cloud log aggregators.
Uses ``severity`` as the level key (Google Cloud Logging convention, also
recognized by Datadog, AWS CloudWatch, and ELK). Includes a ``text`` field
with the human-readable representation for pynmon log explorer compatibility.
"""
[docs]
def format(self, record: "LogRecord") -> str:
"""Emit one JSON object per line with structured context fields."""
timestamp = self.formatTime(record)
message = record.getMessage()
prefix = self._build_context_prefix(record)
# Human-readable text line for pynmon compatibility
text_parts = [timestamp, f"{record.levelname:<8}", record.name]
if prefix:
text_parts.append(f"[{prefix}]")
text_parts.append(message)
log_entry: dict[str, Any] = {
"timestamp": timestamp,
"severity": record.levelname,
"logger": record.name,
"message": message,
**self._build_context_dict(record),
"text": " ".join(text_parts),
}
if record.exc_info and not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
if record.exc_text:
log_entry["exception"] = record.exc_text
if record.stack_info:
log_entry["stack_info"] = self.formatStack(record.stack_info)
return json.dumps(log_entry, default=str)
[docs]
def _format_tz_offset(ct: _time.struct_time) -> str:
"""Format timezone offset as ``±HH:MM`` from struct_time."""
tz = _time.strftime("%z", ct)
if len(tz) >= 5:
return f"{tz[:3]}:{tz[3:]}"
return ""
[docs]
def _auto_detect_colors(stream: Any) -> bool:
"""Return True if *stream* is an interactive TTY (uvicorn/click/rich convention)."""
return hasattr(stream, "isatty") and stream.isatty()
[docs]
def _resolve_stream(log_stream: str) -> Any:
"""Map ``"stdout"``/``"stderr"`` to the corresponding ``sys`` stream."""
if log_stream == "stdout":
return sys.stdout
return sys.stderr
[docs]
def create_logger(
app: "Pynenc",
use_colors: bool | None = None,
log_format: "LogFormat | str | None" = None,
stream: Any | None = None,
) -> logging.Logger:
"""
Create a configured logger for a Pynenc app.
Resolution order for each setting: explicit parameter → ``app.conf`` → auto-detect.
:param Pynenc app: The app instance for which the logger is created.
:param bool | None use_colors: Override color setting (None = auto-detect TTY).
:param LogFormat | str | None log_format: Override log format (None = from config).
:param Any | None stream: Override output stream (None = from config).
:return: The created logger.
:raises ValueError: If the logging level is invalid.
"""
logger = logging.getLogger(f"pynenc.{app.app_id}")
logger.addFilter(PynencContextFilter(app.app_id, app.conf))
# Resolve stream: parameter → config → stderr
if stream is None:
stream = _resolve_stream(app.conf.log_stream)
handler = logging.StreamHandler(stream=stream)
# Resolve format: parameter → config
effective_format = (
LogFormat(log_format)
if log_format is not None
else LogFormat(app.conf.log_format)
)
# Resolve colors: parameter → config → TTY auto-detect
if use_colors is None:
use_colors = app.conf.log_use_colors
if use_colors is None:
use_colors = _auto_detect_colors(stream)
formatter: logging.Formatter
match effective_format:
case LogFormat.JSON:
formatter = JsonFormatter()
case LogFormat.TEXT if use_colors:
formatter = ColoredFormatter(
fmt="%(asctime)s %(levelname)s %(name)s %(message)s",
)
case _:
formatter = PlainTextFormatter(
fmt="%(asctime)s %(levelname)-8s %(name)s %(message)s",
)
handler.setFormatter(formatter)
logger.handlers = [handler]
if level_name := app.conf.logging_level:
numeric_level = getattr(logging, level_name.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Invalid log level: {level_name}")
logger.setLevel(numeric_level)
logger.propagate = False
return logger