"""
Redis Debug Client
This module provides instrumentation for Redis commands to track performance
and identify bottlenecks. It captures detailed call stacks, timing information,
and aggregates statistics to help diagnose Redis performance issues.
"""
# mypy: ignore-errors
# Ignore mypy type checking for this file due to complex interactions with Redis internals
import inspect
import logging
import threading
import time
from collections import defaultdict
from functools import wraps
from typing import Any, Callable, NamedTuple, Optional, TypeVar
import redis
from pynenc.conf.config_redis import ConfigRedis
from pynenc.util.redis_client import _POOLS_LOCK
from pynenc.util.redis_client import get_redis_client as original_get_redis_client
logger = logging.getLogger(__name__)
# Define a type variable for function return types
T = TypeVar("T")
# Configuration constants
DEFAULT_STACK_DEPTH = 7 # Show up to 7 frames in stack traces
MAX_CALL_STACKS_PER_CALLER = 5 # Store up to 5 different call paths per caller
MAX_TOP_CALLERS = 10 # Show top 10 callers in the report
SLOW_COMMAND_THRESHOLD = 0.1 # Log commands taking > 0.1s at INFO level
MAX_ARG_LENGTH = 100 # Truncate arguments longer than this
# =============================================================================
# Data models for tracking Redis command performance
# =============================================================================
[docs]
class CommandStats(NamedTuple):
"""Statistics for a Redis command."""
count: int # Number of times command was executed
total_time: float # Total execution time in seconds
avg_time: float # Average execution time in seconds
max_time: float # Maximum execution time in seconds
[docs]
class CallStack(NamedTuple):
"""Information about a call stack."""
frames: list[str] # The call stack frames
count: int = 1 # How many times this call stack has been observed
[docs]
class CallerInfo(NamedTuple):
"""Information about a Redis command caller."""
module_name: str
class_name: Optional[str]
function_name: str
line_number: int
@property
def formatted_caller(self) -> str:
"""Format the caller information as a string."""
if self.class_name:
return f"{self.module_name}.{self.class_name}.{self.function_name}:{self.line_number}"
return f"{self.module_name}.{self.function_name}:{self.line_number}"
# =============================================================================
# Call stack collection utilities
# =============================================================================
[docs]
def get_frame_info(frame: inspect.FrameInfo) -> CallerInfo:
"""
Extract caller information from a frame.
:param frame: Stack frame to analyze
:return: Structured caller information
"""
module_name = frame.f_globals.get("__name__", "")
lineno = frame.f_lineno
function_name = frame.f_code.co_name
# Check if frame has self (class method)
class_name = None
if "self" in frame.f_locals:
class_name = frame.f_locals["self"].__class__.__name__
return CallerInfo(
module_name=module_name,
class_name=class_name,
function_name=function_name,
line_number=lineno,
)
[docs]
def collect_call_stack() -> tuple[str, list[str]]:
"""
Collect call stack information from the current execution context.
:return: A tuple containing (primary_caller, call_stack)
"""
frame = inspect.currentframe()
caller = "unknown"
call_stack = []
try:
if frame:
# Go back in the call stack to find the non-Redis caller
current_frame = frame.f_back
frame_list = []
# Collect all frames in the stack
while current_frame:
module_name = current_frame.f_globals.get("__name__", "")
# Skip Redis and debug client frames
if not module_name.startswith("redis") and not module_name.endswith(
"redis_debug_client"
):
caller_info = get_frame_info(current_frame)
frame_info = caller_info.formatted_caller
# Add to our call stack
frame_list.append(frame_info)
# First non-Redis caller becomes our primary caller
if caller == "unknown":
caller = frame_info
current_frame = current_frame.f_back
# Create a more specific caller ID for better resolution
if len(frame_list) >= 2:
# Use main caller + immediate parent
# This helps distinguish different call paths to the same function
caller = f"{frame_list[0]}←{frame_list[1].split('.')[-1]}"
else:
caller = frame_list[0] if frame_list else "unknown"
# Reverse to get chronological order (deepest calls first)
call_stack = list(reversed(frame_list))
finally:
if frame:
del frame # Avoid reference cycles
return caller, call_stack
# =============================================================================
# Formatting utilities
# =============================================================================
# =============================================================================
# Core statistics tracking class
# =============================================================================
[docs]
class RedisStats:
"""Tracks Redis command statistics with call stack information."""
def __init__(self) -> None:
"""Initialize Redis stats tracking."""
# Main stats structure: caller -> command -> stats
self.stats: dict[str, dict[str, CommandStats]] = defaultdict(
lambda: defaultdict(lambda: CommandStats(0, 0.0, 0.0, 0.0))
)
# Store multiple call stacks for each caller
self.call_stacks: dict[str, list[CallStack]] = defaultdict(list)
self.lock = threading.Lock()
self.enabled = False
[docs]
def start_tracking(self) -> None:
"""Start tracking Redis command statistics."""
with self.lock:
self.stats.clear()
self.call_stacks.clear()
self.enabled = True
[docs]
def stop_tracking(self) -> None:
"""Stop tracking Redis command statistics."""
with self.lock:
self.enabled = False
[docs]
def add_stat(
self, caller: str, call_stack: list[str], command: str, execution_time: float
) -> None:
"""
Add a command execution stat with call stack information.
:param caller: The code location that called the Redis command
:param call_stack: The call frames leading to this command
:param command: The Redis command name
:param execution_time: How long the command took to execute
"""
if not self.enabled:
return
with self.lock:
# Update command statistics
self._update_command_stats(caller, command, execution_time)
# Track call stack information
if call_stack:
self._update_call_stack(caller, call_stack)
[docs]
def _update_command_stats(
self, caller: str, command: str, execution_time: float
) -> None:
"""Update statistics for a specific command."""
current = self.stats[caller][command]
new_count = current.count + 1
new_total = current.total_time + execution_time
new_avg = new_total / new_count
new_max = max(current.max_time, execution_time)
self.stats[caller][command] = CommandStats(
count=new_count,
total_time=new_total,
avg_time=new_avg,
max_time=new_max,
)
[docs]
def _update_call_stack(self, caller: str, call_stack: list[str]) -> None:
"""Update call stack information for a caller."""
# Convert to tuple for hashability
stack_tuple = tuple(call_stack)
# Check if we've seen this exact stack before
found = False
for i, cs in enumerate(self.call_stacks[caller]):
if tuple(cs.frames) == stack_tuple:
# Update frequency count
self.call_stacks[caller][i] = CallStack(
frames=cs.frames, count=cs.count + 1
)
found = True
break
# If this is a new stack, add it
if not found:
# Limit to max call stacks per caller to avoid memory bloat
if len(self.call_stacks[caller]) < MAX_CALL_STACKS_PER_CALLER:
self.call_stacks[caller].append(CallStack(frames=call_stack))
# -------------------------------------------------------------------------
# Report generation methods
# -------------------------------------------------------------------------
[docs]
def get_sorted_commands(self) -> list[tuple[str, str, CommandStats]]:
"""Get commands sorted by total time (for programmatic use)."""
with self.lock:
return self._get_sorted_commands_list()
[docs]
def get_report(
self,
top_n: int = 20,
stack_depth: int = DEFAULT_STACK_DEPTH,
table_width: int = 140,
) -> str:
"""
Generate a performance report for Redis commands.
:param top_n: Number of top commands to include
:param stack_depth: Number of frames to show in call stacks
:param table_width: Maximum width for the table
:return: Formatted report as a string
"""
with self.lock:
if not self.stats:
return "No Redis statistics collected."
# Collect the report parts
report_parts = []
# Add header
report_parts.extend(
["Redis Commands Performance Report", "=" * min(80, table_width)]
)
# Get the sorted command list
all_commands = self._get_sorted_commands_list()
# Add the commands table
report_parts.extend(
self._format_commands_table(all_commands, top_n, table_width)
)
# Add summary statistics
report_parts.extend(self._get_summary_stats(all_commands))
# Add top callers section
report_parts.extend(
self._get_top_callers_section(all_commands, stack_depth)
)
# Join all parts into a single report
return "\n".join(report_parts)
[docs]
def _get_sorted_commands_list(self) -> list[tuple[str, str, CommandStats]]:
"""Get commands sorted by total execution time."""
all_commands = []
for caller, commands in self.stats.items():
for cmd, stats in commands.items():
all_commands.append((caller, cmd, stats))
# Sort by total time (most expensive first)
all_commands.sort(key=lambda x: x[2].total_time, reverse=True)
return all_commands
[docs]
def _get_summary_stats(
self, all_commands: list[tuple[str, str, CommandStats]]
) -> list[str]:
"""Generate the summary statistics section."""
if not all_commands:
return []
total_commands = sum(stats.count for _, _, stats in all_commands)
total_time = sum(stats.total_time for _, _, stats in all_commands)
avg_time = (total_time / total_commands) * 1000 if total_commands > 0 else 0
return [
f"Total Commands: {total_commands}",
f"Total Time: {total_time:.3f}s",
f"Average Time per Command: {avg_time:.2f}ms"
if total_commands > 0
else "No commands executed.",
]
# Update _get_top_callers_section method to better display call stacks
[docs]
def _get_top_callers_section(
self, all_commands: list[tuple[str, str, CommandStats]], stack_depth: int
) -> list[str]:
"""Generate the top callers section with call stacks and improved formatting."""
if not all_commands:
return []
report = ["\nTop Callers by Total Time:"]
# Aggregate stats by caller
caller_stats = self._aggregate_stats_by_caller(all_commands)
# Sort and display top callers
sorted_callers = sorted(
caller_stats.items(), key=lambda x: x[1].total_time, reverse=True
)[:MAX_TOP_CALLERS]
for caller, stats in sorted_callers:
# Basic stats line
report.append(
f"{caller}: {stats.count} commands, {stats.total_time:.3f}s total, {stats.avg_time*1000:.2f}ms avg"
)
# Add call stacks if available
if caller in self.call_stacks and self.call_stacks[caller]:
# Collect all unique frames across paths for comparison
all_frames = set()
for call_stack in self.call_stacks[caller]:
all_frames.update(call_stack.frames)
# Sort stacks by frequency
sorted_stacks = sorted(
self.call_stacks[caller], key=lambda cs: cs.count, reverse=True
)
# Track already shown frames to detect duplicates
shown_paths = set()
# Show up to 3 most common call stacks
for idx, call_stack in enumerate(sorted_stacks[:3]):
# Convert frame list to tuple for hashability
frames_tuple = tuple(call_stack.frames)
# Skip this stack if it's essentially the same as one we've shown
# (This handles small variations in line numbers but same call path)
if self._is_duplicate_path(frames_tuple, shown_paths):
continue
# Different label for first vs alternate paths
if idx == 0:
report.append(
f" Most common call path (seen {call_stack.count} times):"
)
else:
report.append(
f" Alternate call path (seen {call_stack.count} times):"
)
# Show the most recent frames in the call stack (up to stack_depth)
frames_to_show = min(stack_depth, len(call_stack.frames))
for frame in call_stack.frames[-frames_to_show:]:
# Format frame for readability
frame_display = self._format_frame_for_display(frame)
report.append(f" - {frame_display}")
# Add this path to shown paths
shown_paths.add(frames_tuple)
return report
[docs]
def _is_duplicate_path(
self, new_path: tuple[str, ...], existing_paths: set[tuple[str, ...]]
) -> bool:
"""
Check if a call path is essentially a duplicate of an existing path.
This compares the function names and classes, ignoring minor line number differences.
:param new_path: The new call path to check
:param existing_paths: Previously shown call paths
:return: True if this path is essentially a duplicate
"""
if not existing_paths:
return False
# For each existing path, check if it matches the new path
for existing_path in existing_paths:
# Must be same length to be considered a potential match
if len(new_path) != len(existing_path):
continue
# Check if they're essentially the same path
matches = 0
for i, (new_frame, old_frame) in enumerate(zip(new_path, existing_path)):
# Strip line numbers for comparison
new_parts = (
new_frame.split(":", 1)[0] if ":" in new_frame else new_frame
)
old_parts = (
old_frame.split(":", 1)[0] if ":" in old_frame else old_frame
)
# If same module + function, count as match
if new_parts == old_parts:
matches += 1
# Small variations in line numbers are ok for most frames
# except the immediate caller (last frame)
elif (
i < len(new_path) - 1
and new_parts.split(".")[-1] == old_parts.split(".")[-1]
):
matches += 0.8 # Partial match
# If > 90% same, consider it a duplicate
if matches / len(new_path) > 0.9:
return True
return False
# Update the _format_commands_table method for better alignment
[docs]
def _aggregate_stats_by_caller(
self, all_commands: list[tuple[str, str, CommandStats]]
) -> dict[str, CommandStats]:
"""Aggregate command statistics by caller."""
caller_stats = {}
# First pass: sum up stats for each caller
for caller, _, stats in all_commands:
if caller not in caller_stats:
caller_stats[caller] = stats
else:
current = caller_stats[caller]
caller_stats[caller] = CommandStats(
current.count + stats.count,
current.total_time + stats.total_time,
0, # Will calculate average later
max(current.max_time, stats.max_time),
)
# Second pass: calculate averages
for caller, stats in caller_stats.items():
if stats.count > 0:
caller_stats[caller] = CommandStats(
stats.count,
stats.total_time,
stats.total_time / stats.count,
stats.max_time,
)
return caller_stats
# =============================================================================
# Command logging and Redis client wrapper
# =============================================================================
[docs]
def log_redis_command(func: Callable[..., T]) -> Callable[..., T]:
"""
Decorator to log Redis commands with timing and call stack information.
:param func: Redis command function to wrap
:return: Wrapped function with logging and stats collection
"""
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> T:
# Get function name for logging
func_name = func.__name__
# Format arguments for logging
formatted_args = [format_arg(arg) for arg in args[1:]] # Skip self
formatted_kwargs = {k: format_arg(v) for k, v in kwargs.items()}
# Log command start with DEBUG level
log_prefix = "REDIS_CMD"
logger.debug(
f"{log_prefix} START: {func_name}({', '.join(formatted_args)}, {formatted_kwargs})"
)
# Get the caller information and stack trace
caller, call_stack = collect_call_stack()
# Execute command and time it
start_time = time.time()
try:
# Execute the Redis command
result = func(*args, **kwargs)
elapsed = time.time() - start_time
# Add to statistics
redis_stats.add_stat(caller, call_stack, func_name, elapsed)
# Format result for logging (with truncation for long results)
result_str = format_arg(result)
# Log command completion with appropriate level based on performance
if elapsed > SLOW_COMMAND_THRESHOLD:
# Use INFO level for slow commands
logger.info(
f"{log_prefix} SLOW: {func_name} took {elapsed:.6f}s - "
f"Result: {result_str} - Called by: {caller}"
)
else:
# Use DEBUG level for normal commands
logger.debug(
f"{log_prefix} END: {func_name} took {elapsed:.6f}s - Result: {result_str}"
)
return result
except Exception as e:
# Handle and log exceptions
elapsed = time.time() - start_time
# Still record failed commands in stats
redis_stats.add_stat(caller, call_stack, f"{func_name}_ERROR", elapsed)
# Log the error with ERROR level
logger.error(
f"{log_prefix} ERROR: {func_name} failed after {elapsed:.6f}s - "
f"{type(e).__name__}: {e} - Called by: {caller}"
)
# Re-raise the original exception
raise
return wrapper
[docs]
class DebugRedisClient(redis.Redis):
"""Redis client that logs all commands with timing information."""
[docs]
def __getattribute__(self, name: str) -> Any:
"""
Intercept attribute access to wrap Redis commands with logging.
:param name: Attribute name
:return: Wrapped command or original attribute
"""
attr = super().__getattribute__(name)
# Only wrap callable Redis commands, not internal methods or attributes
if (
callable(attr)
and not name.startswith("_")
and name
not in [
"execute_command",
"parse_response",
"connection_pool",
"from_url",
]
):
return log_redis_command(attr)
return attr
[docs]
def _execute_command(self, *args: Any, **options: Any) -> Any:
"""
Override the core Redis client execute_command method to track all operations.
This is the central method that all Redis commands ultimately call.
"""
# Get command name
command_name = args[0] if args else "unknown"
# Get the caller information and stack trace
caller, call_stack = collect_call_stack()
# Format arguments for logging (with some limitations)
formatted_args = [format_arg(arg) for arg in args[1:]]
formatted_kwargs = {
k: format_arg(v) for k, v in options.items() if k not in ["keys"]
}
log_prefix = "REDIS_CMD"
logger.debug(
f"{log_prefix} START: {command_name}({', '.join(formatted_args)}, {formatted_kwargs})"
)
# Execute command and time it
start_time = time.time()
try:
# Call the original Redis client method
result = super()._execute_command(*args, **options)
elapsed = time.time() - start_time
# Add to statistics
redis_stats.add_stat(caller, call_stack, command_name, elapsed)
# Format result for logging
result_str = format_arg(result)
# Log command completion
if elapsed > SLOW_COMMAND_THRESHOLD:
logger.info(
f"{log_prefix} SLOW: {command_name} took {elapsed:.6f}s - "
f"Result: {result_str} - Called by: {caller}"
)
else:
logger.debug(
f"{log_prefix} END: {command_name} took {elapsed:.6f}s - Result: {result_str}"
)
return result
except Exception as e:
# Handle and log exceptions
elapsed = time.time() - start_time
# Still record failed commands in stats
redis_stats.add_stat(caller, call_stack, f"{command_name}_ERROR", elapsed)
# Log the error
logger.error(
f"{log_prefix} ERROR: {command_name} failed after {elapsed:.6f}s - "
f"{type(e).__name__}: {e} - Called by: {caller}"
)
# Re-raise the original exception
raise
# =============================================================================
# Patcher for redis_client function
# =============================================================================
[docs]
def patch_redis_client() -> None:
"""
Patch the get_redis_client function to return a DebugRedisClient instance.
This ensures all Redis operations throughout the application are monitored.
"""
import pynenc.util.redis_client
@wraps(original_get_redis_client)
def patched_get_redis_client(conf: ConfigRedis) -> redis.Redis:
"""
Patched version of get_redis_client that returns a DebugRedisClient.
Uses the same connection pooling logic as the original function.
"""
global _REDIS_POOLS
if conf.redis_url:
pool_key = conf.redis_url
else:
pool_key = f"{conf.redis_host}:{conf.redis_port}:{conf.redis_db}:{conf.redis_username or ''}:{conf.redis_password or ''}"
with _POOLS_LOCK:
if pool_key not in _REDIS_POOLS:
max_connections = conf.redis_pool_max_connections
logger.info(
f"REDIS_DEBUG: Creating new pool with max_connections={max_connections} "
f"for {pool_key}"
)
if conf.redis_url:
_REDIS_POOLS[pool_key] = redis.ConnectionPool.from_url(
conf.redis_url,
max_connections=max_connections,
socket_timeout=conf.socket_timeout,
socket_connect_timeout=conf.socket_connect_timeout,
socket_keepalive=True,
health_check_interval=conf.redis_pool_health_check_interval,
retry_on_timeout=True,
)
else:
_REDIS_POOLS[pool_key] = redis.ConnectionPool(
host=conf.redis_host,
port=conf.redis_port,
db=conf.redis_db,
username=None
if not conf.redis_username
else conf.redis_username,
password=None
if not conf.redis_password
else conf.redis_password,
socket_timeout=conf.socket_timeout,
socket_connect_timeout=conf.socket_connect_timeout,
socket_keepalive=True,
health_check_interval=conf.redis_pool_health_check_interval,
max_connections=max_connections,
retry_on_timeout=True,
)
# Create a debug client with the same connection pool
client = DebugRedisClient(connection_pool=_REDIS_POOLS[pool_key])
# Log a message about the new debug client
connection_info = (
f"{conf.redis_host}:{conf.redis_port}/{conf.redis_db}"
if not conf.redis_url
else conf.redis_url
)
logger.info(
f"REDIS_DEBUG: Created debug client with connection pooling for {connection_info}"
)
# Test the connection with a ping
try:
ping_start = time.time()
res = client.ping()
ping_elapsed = time.time() - ping_start
logger.info(
f"REDIS_DEBUG: Redis ping response: {res} in {ping_elapsed:.6f}s"
)
except Exception as e:
logger.error(f"REDIS_DEBUG: Redis ping failed: {e}")
return client
# Replace the original function with our patched version
pynenc.util.redis_client.get_redis_client = patched_get_redis_client
logger.info("REDIS_DEBUG: get_redis_client patched to use DebugRedisClient")
# =============================================================================
# Pool Monitoring Functions
# =============================================================================
[docs]
def log_pool_stats() -> None:
"""Log statistics about all Redis connection pools."""
global _REDIS_POOLS
for pool_key, pool in _REDIS_POOLS.items():
if hasattr(pool, "_available_connections") and hasattr(
pool, "_in_use_connections"
):
available = len(pool._available_connections)
in_use = len(pool._in_use_connections)
max_conn = pool.max_connections
usage_pct = (in_use / max_conn) * 100 if max_conn else 0
logger.debug(
f"REDIS_DEBUG: Pool {pool_key}: "
f"{in_use} in use, {available} available, "
f"{in_use + available}/{max_conn} total ({usage_pct:.1f}%)"
)
# Alert on high usage
if usage_pct > 80:
logger.warning(
f"REDIS_DEBUG: Pool {pool_key} at {usage_pct:.1f}% capacity! "
f"Consider increasing max_connections (currently {max_conn})"
)
[docs]
def monitor_redis_pools(interval_seconds: int = 60) -> None:
"""
Start a background thread to monitor Redis connection pools.
:param interval_seconds: How often to log pool statistics
"""
def _monitor_loop() -> None:
while True:
log_pool_stats()
time.sleep(interval_seconds)
monitor_thread = threading.Thread(
target=_monitor_loop, name="RedisPoolMonitor", daemon=True
)
monitor_thread.start()
logger.info("REDIS_DEBUG: Redis pool monitoring started")
# =============================================================================
# Initialization and Cleanup
# =============================================================================
[docs]
def start_redis_debugging(
monitor_pools: bool = True, monitor_interval: int = 60
) -> None:
"""
Start Redis debugging with optional pool monitoring.
:param monitor_pools: Whether to monitor connection pools
:param monitor_interval: How often to log pool statistics (seconds)
"""
# Start stats tracking
start_tracking_redis_stats()
# Patch the client function
patch_redis_client()
# Start pool monitoring if requested
if monitor_pools:
monitor_redis_pools(monitor_interval)
logger.info("REDIS_DEBUG: Redis debugging fully enabled")
# =============================================================================
# Global instance and module-level functions
# =============================================================================
# Create a global instance
redis_stats = RedisStats()
# Expose stats functionality to the module level
[docs]
def start_tracking_redis_stats() -> None:
"""Start tracking Redis command statistics."""
redis_stats.start_tracking()
logger.info("Redis stats tracking enabled")
[docs]
def stop_tracking_redis_stats() -> None:
"""Stop tracking Redis command statistics."""
redis_stats.stop_tracking()
logger.info("Redis stats tracking disabled")
[docs]
def get_redis_stats_report(
top_n: int = 20, stack_depth: int = DEFAULT_STACK_DEPTH, table_width: int = 140
) -> str:
"""
Get a formatted report of Redis command statistics.
:param top_n: Number of top commands to include in the report
:param stack_depth: Number of call stack frames to show for each caller
:param table_width: Maximum width for the table display
:return: Formatted report
"""
return redis_stats.get_report(top_n, stack_depth, table_width)
[docs]
def get_sorted_redis_commands() -> list[tuple[str, str, CommandStats]]:
"""Get Redis commands sorted by total execution time."""
return redis_stats.get_sorted_commands()
# Add this deeper patching function at the bottom of the file
[docs]
def deep_patch_redis() -> None:
"""
Patch Redis at a deeper level to guarantee all Redis operations are tracked.
This patches the Redis module's core methods that all clients use.
"""
import redis.client
# Save the original method
original_execute = redis.client.Redis._execute_command
# Create our replacement method
def patched_execute(self, *args, **options):
"""Patched execute_command for all Redis clients"""
# Get command name
command_name = args[0] if args else "unknown"
# Get caller information using our existing utilities
caller, call_stack = collect_call_stack()
# Execute command and time it
start_time = time.time()
try:
# Call the original Redis client method
result = original_execute(self, *args, **options)
elapsed = time.time() - start_time
# Add to statistics - we're tracking even for non-DebugRedisClient instances
redis_stats.add_stat(caller, call_stack, command_name, elapsed)
# Only log slow commands to reduce noise
if elapsed > SLOW_COMMAND_THRESHOLD:
# Format arguments and results only for slow commands
formatted_args = [format_arg(arg) for arg in args[1:]]
formatted_kwargs = {
k: format_arg(v) for k, v in options.items() if k not in ["keys"]
}
_ = format_arg(result)
logger.debug(
f"REDIS_SLOW: {command_name} took {elapsed:.6f}s - "
f"Args: ({', '.join(formatted_args)}, {formatted_kwargs}) - "
f"Called by: {caller}"
)
return result
except Exception as e:
# Handle and log exceptions
elapsed = time.time() - start_time
# Still record failed commands in stats
redis_stats.add_stat(caller, call_stack, f"{command_name}_ERROR", elapsed)
# Log the error with controlled output
logger.error(
f"REDIS_ERROR: {command_name} failed after {elapsed:.6f}s - "
f"{type(e).__name__}: {e} - Called by: {caller}"
)
# Re-raise the original exception
raise
# Apply the patching directly to the Redis class
redis.client.Redis._execute_command = patched_execute
logger.info("REDIS_DEBUG: Applied deep patching to all Redis clients")
return True
# Automatically start patching redis clients and tracking stats
# when this module is imported
deep_patch_redis()
start_tracking_redis_stats()