You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

839 lines
23 KiB

"""
Simple, clean API for recording observability metrics.
This module provides a straightforward interface for Redis core code to record
metrics without needing to know about OpenTelemetry internals.
Usage in Redis core code:
from redis.observability.recorder import record_operation_duration
start_time = time.monotonic()
# ... execute Redis command ...
record_operation_duration(
command_name='SET',
duration_seconds=time.monotonic() - start_time,
server_address='localhost',
server_port=6379,
db_namespace='0',
error=None
)
"""
from datetime import datetime
from typing import TYPE_CHECKING, Callable, List, Optional
from redis.observability.attributes import (
AttributeBuilder,
CSCReason,
CSCResult,
GeoFailoverReason,
PubSubDirection,
)
from redis.observability.metrics import CloseReason, RedisMetricsCollector
from redis.observability.providers import get_observability_instance
from redis.observability.registry import get_observables_registry_instance
from redis.utils import deprecated_args, str_if_bytes
if TYPE_CHECKING:
from redis.connection import ConnectionPoolInterface
from redis.multidb.database import SyncDatabase
from redis.observability.config import OTelConfig
# Global metrics collector instance (lazy-initialized)
_metrics_collector: Optional[RedisMetricsCollector] = None
CONNECTION_COUNT_REGISTRY_KEY = "connection_count"
CSC_ITEMS_REGISTRY_KEY = "csc_items"
@deprecated_args(
args_to_warn=["batch_size"],
reason="The batch_size argument is no longer used and will be removed in the next major version.",
version="7.2.1",
)
def record_operation_duration(
command_name: str,
duration_seconds: float,
server_address: Optional[str] = None,
server_port: Optional[int] = None,
db_namespace: Optional[str] = None,
error: Optional[Exception] = None,
is_blocking: Optional[bool] = None,
batch_size: Optional[int] = None, # noqa
retry_attempts: Optional[int] = None,
) -> None:
"""
Record a Redis command execution duration.
This is a simple, clean API that Redis core code can call directly.
If observability is not enabled, this returns immediately with zero overhead.
Args:
command_name: Redis command name (e.g., 'GET', 'SET')
duration_seconds: Command execution time in seconds
server_address: Redis server address
server_port: Redis server port
db_namespace: Redis database index
error: Exception if command failed, None if successful
is_blocking: Whether the operation is a blocking command
batch_size: Number of commands in batch (for pipelines/transactions)
retry_attempts: Number of retry attempts made
Example:
>>> start = time.monotonic()
>>> # ... execute command ...
>>> record_operation_duration('SET', time.monotonic() - start, 'localhost', 6379, '0')
"""
global _metrics_collector
# Fast path: if collector not initialized, observability is disabled
if _metrics_collector is None:
# Try to initialize (only once)
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return # Observability not enabled
# Record the metric
try:
_metrics_collector.record_operation_duration(
command_name=command_name,
duration_seconds=duration_seconds,
server_address=server_address,
server_port=server_port,
db_namespace=db_namespace,
error_type=error,
network_peer_address=server_address,
network_peer_port=server_port,
is_blocking=is_blocking,
retry_attempts=retry_attempts,
)
except Exception:
# Don't let metric recording errors break Redis operations
pass
def record_connection_create_time(
connection_pool: "ConnectionPoolInterface",
duration_seconds: float,
) -> None:
"""
Record connection creation time.
Args:
connection_pool: Connection pool implementation
duration_seconds: Time taken to create connection in seconds
Example:
>>> start = time.monotonic()
>>> # ... create connection ...
>>> record_connection_create_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
"""
global _metrics_collector
# Fast path: if collector not initialized, observability is disabled
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_connection_create_time(
connection_pool=connection_pool,
duration_seconds=duration_seconds,
)
except Exception:
pass
def init_connection_count() -> None:
"""
Initialize observable gauge for connection count metric.
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
def observable_callback(__):
observables_registry = get_observables_registry_instance()
callbacks = observables_registry.get(CONNECTION_COUNT_REGISTRY_KEY)
observations = []
for callback in callbacks:
observations.extend(callback())
return observations
try:
_metrics_collector.init_connection_count(
callback=observable_callback,
)
except Exception:
pass
def register_pools_connection_count(
connection_pools: List["ConnectionPoolInterface"],
) -> None:
"""
Add connection pools to connection count observable registry.
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
# Lazy import
from opentelemetry.metrics import Observation
def connection_count_callback():
observations = []
for connection_pool in connection_pools:
for count, attributes in connection_pool.get_connection_count():
observations.append(Observation(count, attributes=attributes))
return observations
observables_registry = get_observables_registry_instance()
observables_registry.register(
CONNECTION_COUNT_REGISTRY_KEY, connection_count_callback
)
except Exception:
pass
def record_connection_timeout(
pool_name: str,
) -> None:
"""
Record a connection timeout event.
Args:
pool_name: Connection pool identifier
Example:
>>> record_connection_timeout('ConnectionPool<localhost:6379>')
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_connection_timeout(
pool_name=pool_name,
)
except Exception:
pass
def record_connection_wait_time(
pool_name: str,
duration_seconds: float,
) -> None:
"""
Record time taken to obtain a connection from the pool.
Args:
pool_name: Connection pool identifier
duration_seconds: Wait time in seconds
Example:
>>> start = time.monotonic()
>>> # ... wait for connection from pool ...
>>> record_connection_wait_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_connection_wait_time(
pool_name=pool_name,
duration_seconds=duration_seconds,
)
except Exception:
pass
def record_connection_closed(
close_reason: Optional[CloseReason] = None,
error_type: Optional[Exception] = None,
) -> None:
"""
Record a connection closed event.
Args:
close_reason: Reason for closing (e.g. 'error', 'application_close')
error_type: Error type if closed due to error
Example:
>>> record_connection_closed('ConnectionPool<localhost:6379>', 'idle_timeout')
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_connection_closed(
close_reason=close_reason,
error_type=error_type,
)
except Exception:
pass
def record_connection_relaxed_timeout(
connection_name: str,
maint_notification: str,
relaxed: bool,
) -> None:
"""
Record a connection timeout relaxation event.
Args:
connection_name: Connection identifier
maint_notification: Maintenance notification type
relaxed: True to count up (relaxed), False to count down (unrelaxed)
Example:
>>> record_connection_relaxed_timeout('Connection<localhost:6379>', 'MOVING', True)
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_connection_relaxed_timeout(
connection_name=connection_name,
maint_notification=maint_notification,
relaxed=relaxed,
)
except Exception:
pass
def record_connection_handoff(
pool_name: str,
) -> None:
"""
Record a connection handoff event (e.g., after MOVING notification).
Args:
pool_name: Connection pool identifier
Example:
>>> record_connection_handoff('ConnectionPool<localhost:6379>')
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_connection_handoff(
pool_name=pool_name,
)
except Exception:
pass
def record_error_count(
server_address: Optional[str] = None,
server_port: Optional[int] = None,
network_peer_address: Optional[str] = None,
network_peer_port: Optional[int] = None,
error_type: Optional[Exception] = None,
retry_attempts: Optional[int] = None,
is_internal: bool = True,
) -> None:
"""
Record error count.
Args:
server_address: Server address
server_port: Server port
network_peer_address: Network peer address
network_peer_port: Network peer port
error_type: Error type (Exception)
retry_attempts: Retry attempts
is_internal: Whether the error is internal (e.g., timeout, network error)
Example:
>>> record_error_count('localhost', 6379, 'localhost', 6379, ConnectionError(), 3)
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_error_count(
server_address=server_address,
server_port=server_port,
network_peer_address=network_peer_address,
network_peer_port=network_peer_port,
error_type=error_type,
retry_attempts=retry_attempts,
is_internal=is_internal,
)
except Exception:
pass
def record_pubsub_message(
direction: PubSubDirection,
channel: Optional[str] = None,
sharded: Optional[bool] = None,
) -> None:
"""
Record a PubSub message (published or received).
Args:
direction: Message direction ('publish' or 'receive')
channel: Pub/Sub channel name
sharded: True if sharded Pub/Sub channel
Example:
>>> record_pubsub_message(PubSubDirection.PUBLISH, 'channel', False)
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
# Check if channel names should be hidden
effective_channel = channel
if channel is not None:
config = _get_config()
if config is not None and config.hide_pubsub_channel_names:
effective_channel = None
try:
_metrics_collector.record_pubsub_message(
direction=direction,
channel=effective_channel,
sharded=sharded,
)
except Exception:
pass
@deprecated_args(
args_to_warn=["consumer_name"],
reason="The consumer_name argument is no longer used and will be removed in the next major version.",
version="7.2.1",
)
def record_streaming_lag(
lag_seconds: float,
stream_name: Optional[str] = None,
consumer_group: Optional[str] = None,
consumer_name: Optional[str] = None, # noqa
) -> None:
"""
Record the lag of a streaming message.
Args:
lag_seconds: Lag in seconds
stream_name: Stream name
consumer_group: Consumer group name
consumer_name: Consumer name
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
# Check if stream names should be hidden
effective_stream_name = stream_name
if stream_name is not None:
config = _get_config()
if config is not None and config.hide_stream_names:
effective_stream_name = None
try:
_metrics_collector.record_streaming_lag(
lag_seconds=lag_seconds,
stream_name=effective_stream_name,
consumer_group=consumer_group,
)
except Exception:
pass
@deprecated_args(
args_to_warn=["consumer_name"],
reason="The consumer_name argument is no longer used and will be removed in the next major version.",
version="7.2.1",
)
def record_streaming_lag_from_response(
response,
consumer_group: Optional[str] = None,
consumer_name: Optional[str] = None, # noqa
) -> None:
"""
Record streaming lag from XREAD/XREADGROUP response.
Parses the response and calculates lag for each message based on message ID timestamp.
Args:
response: Response from XREAD/XREADGROUP command
consumer_group: Consumer group name (for XREADGROUP)
consumer_name: Consumer name (for XREADGROUP)
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
if not response:
return
try:
now = datetime.now().timestamp()
# Check if stream names should be hidden
config = _get_config()
hide_stream_names = config is not None and config.hide_stream_names
# RESP3 format: dict
if isinstance(response, dict):
for stream_name, stream_messages in response.items():
effective_stream_name = (
None if hide_stream_names else str_if_bytes(stream_name)
)
for messages in stream_messages:
for message in messages:
message_id, _ = message
message_id = str_if_bytes(message_id)
timestamp, _ = message_id.split("-")
# Ensure lag is non-negative (clock skew can cause negative values)
lag_seconds = max(0.0, now - int(timestamp) / 1000)
_metrics_collector.record_streaming_lag(
lag_seconds=lag_seconds,
stream_name=effective_stream_name,
consumer_group=consumer_group,
)
else:
# RESP2 format: list
for stream_entry in response:
stream_name = str_if_bytes(stream_entry[0])
effective_stream_name = None if hide_stream_names else stream_name
for message in stream_entry[1]:
message_id, _ = message
message_id = str_if_bytes(message_id)
timestamp, _ = message_id.split("-")
# Ensure lag is non-negative (clock skew can cause negative values)
lag_seconds = max(0.0, now - int(timestamp) / 1000)
_metrics_collector.record_streaming_lag(
lag_seconds=lag_seconds,
stream_name=effective_stream_name,
consumer_group=consumer_group,
)
except Exception:
pass
def record_maint_notification_count(
server_address: str,
server_port: int,
network_peer_address: str,
network_peer_port: int,
maint_notification: str,
) -> None:
"""
Record a maintenance notification count.
Args:
server_address: Server address
server_port: Server port
network_peer_address: Network peer address
network_peer_port: Network peer port
maint_notification: Maintenance notification type (e.g., 'MOVING', 'MIGRATING')
Example:
>>> record_maint_notification_count('localhost', 6379, 'localhost', 6379, 'MOVING')
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_maint_notification_count(
server_address=server_address,
server_port=server_port,
network_peer_address=network_peer_address,
network_peer_port=network_peer_port,
maint_notification=maint_notification,
)
except Exception:
pass
def record_csc_request(
result: Optional[CSCResult] = None,
):
"""
Record a Client Side Caching (CSC) request.
Args:
result: CSC result ('hit' or 'miss')
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_csc_request(
result=result,
)
except Exception:
pass
def init_csc_items() -> None:
"""
Initialize observable gauge for CSC items metric.
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
def observable_callback(__):
observables_registry = get_observables_registry_instance()
callbacks = observables_registry.get(CSC_ITEMS_REGISTRY_KEY)
observations = []
for callback in callbacks:
observations.extend(callback())
return observations
try:
_metrics_collector.init_csc_items(
callback=observable_callback,
)
except Exception:
pass
def register_csc_items_callback(
callback: Callable,
pool_name: Optional[str] = None,
) -> None:
"""
Adds given callback to CSC items observable registry.
Args:
callback: Callback function that returns the cache size
pool_name: Connection pool name for observability
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
# Lazy import
from opentelemetry.metrics import Observation
def csc_items_callback():
return [
Observation(
callback(),
attributes=AttributeBuilder.build_csc_attributes(pool_name=pool_name),
)
]
try:
observables_registry = get_observables_registry_instance()
observables_registry.register(CSC_ITEMS_REGISTRY_KEY, csc_items_callback)
except Exception:
pass
def record_csc_eviction(
count: int,
reason: Optional[CSCReason] = None,
) -> None:
"""
Record a Client Side Caching (CSC) eviction.
Args:
count: Number of evictions
reason: Reason for eviction
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_csc_eviction(
count=count,
reason=reason,
)
except Exception:
pass
def record_csc_network_saved(
bytes_saved: int,
) -> None:
"""
Record the number of bytes saved by using Client Side Caching (CSC).
Args:
bytes_saved: Number of bytes saved
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_csc_network_saved(
bytes_saved=bytes_saved,
)
except Exception:
pass
def record_geo_failover(
fail_from: "SyncDatabase",
fail_to: "SyncDatabase",
reason: GeoFailoverReason,
) -> None:
"""
Record a geo failover.
Args:
fail_from: Database failed from
fail_to: Database failed to
reason: Reason for the failover
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return
try:
_metrics_collector.record_geo_failover(
fail_from=fail_from,
fail_to=fail_to,
reason=reason,
)
except Exception:
pass
def _get_or_create_collector() -> Optional[RedisMetricsCollector]:
"""
Get or create the global metrics collector.
Returns:
RedisMetricsCollector instance if observability is enabled, None otherwise
"""
try:
manager = get_observability_instance().get_provider_manager()
if manager is None or not manager.config.enabled_telemetry:
return None
# Get meter from the global MeterProvider
meter = manager.get_meter_provider().get_meter(
RedisMetricsCollector.METER_NAME, RedisMetricsCollector.METER_VERSION
)
return RedisMetricsCollector(meter, manager.config)
except ImportError:
# Observability module not available
return None
except Exception:
# Any other error - don't break Redis operations
return None
def _get_config() -> Optional["OTelConfig"]:
"""
Get the OTel configuration from the observability manager.
Returns:
OTelConfig instance if observability is enabled, None otherwise
"""
try:
manager = get_observability_instance().get_provider_manager()
if manager is None:
return None
return manager.config
except Exception:
return None
def reset_collector() -> None:
"""
Reset the global collector (used for testing or re-initialization).
"""
global _metrics_collector
_metrics_collector = None
def is_enabled() -> bool:
"""
Check if observability is enabled.
Returns:
True if metrics are being collected, False otherwise
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
return _metrics_collector is not None