You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
425 lines
13 KiB
425 lines
13 KiB
"""
|
|
OpenTelemetry semantic convention attributes for Redis.
|
|
|
|
This module provides constants and helper functions for building OTel attributes
|
|
according to the semantic conventions for database clients.
|
|
|
|
Reference: https://opentelemetry.io/docs/specs/semconv/database/redis/
|
|
"""
|
|
|
|
from enum import Enum
|
|
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
|
|
|
|
import redis
|
|
|
|
if TYPE_CHECKING:
|
|
from redis.asyncio.multidb.database import AsyncDatabase
|
|
from redis.connection import ConnectionPoolInterface
|
|
from redis.multidb.database import SyncDatabase
|
|
|
|
# Database semantic convention attributes
|
|
DB_SYSTEM = "db.system"
|
|
DB_NAMESPACE = "db.namespace"
|
|
DB_OPERATION_NAME = "db.operation.name"
|
|
DB_RESPONSE_STATUS_CODE = "db.response.status_code"
|
|
DB_STORED_PROCEDURE_NAME = "db.stored_procedure.name"
|
|
|
|
# Error attributes
|
|
ERROR_TYPE = "error.type"
|
|
|
|
# Network attributes
|
|
NETWORK_PEER_ADDRESS = "network.peer.address"
|
|
NETWORK_PEER_PORT = "network.peer.port"
|
|
|
|
# Server attributes
|
|
SERVER_ADDRESS = "server.address"
|
|
SERVER_PORT = "server.port"
|
|
|
|
# Connection pool attributes
|
|
DB_CLIENT_CONNECTION_POOL_NAME = "db.client.connection.pool.name"
|
|
DB_CLIENT_CONNECTION_STATE = "db.client.connection.state"
|
|
DB_CLIENT_CONNECTION_NAME = "db.client.connection.name"
|
|
|
|
# Geofailover attributes
|
|
DB_CLIENT_GEOFAILOVER_FAIL_FROM = "db.client.geofailover.fail_from"
|
|
DB_CLIENT_GEOFAILOVER_FAIL_TO = "db.client.geofailover.fail_to"
|
|
DB_CLIENT_GEOFAILOVER_REASON = "db.client.geofailover.reason"
|
|
|
|
# Redis-specific attributes
|
|
REDIS_CLIENT_LIBRARY = "redis.client.library"
|
|
REDIS_CLIENT_CONNECTION_PUBSUB = "redis.client.connection.pubsub"
|
|
REDIS_CLIENT_CONNECTION_CLOSE_REASON = "redis.client.connection.close.reason"
|
|
REDIS_CLIENT_CONNECTION_NOTIFICATION = "redis.client.connection.notification"
|
|
REDIS_CLIENT_OPERATION_RETRY_ATTEMPTS = "redis.client.operation.retry_attempts"
|
|
REDIS_CLIENT_OPERATION_BLOCKING = "redis.client.operation.blocking"
|
|
REDIS_CLIENT_PUBSUB_MESSAGE_DIRECTION = "redis.client.pubsub.message.direction"
|
|
REDIS_CLIENT_PUBSUB_CHANNEL = "redis.client.pubsub.channel"
|
|
REDIS_CLIENT_PUBSUB_SHARDED = "redis.client.pubsub.sharded"
|
|
REDIS_CLIENT_ERROR_INTERNAL = "redis.client.errors.internal"
|
|
REDIS_CLIENT_ERROR_CATEGORY = "redis.client.errors.category"
|
|
REDIS_CLIENT_STREAM_NAME = "redis.client.stream.name"
|
|
REDIS_CLIENT_CONSUMER_GROUP = "redis.client.consumer_group"
|
|
REDIS_CLIENT_CSC_RESULT = "redis.client.csc.result"
|
|
REDIS_CLIENT_CSC_REASON = "redis.client.csc.reason"
|
|
|
|
|
|
class ConnectionState(Enum):
|
|
IDLE = "idle"
|
|
USED = "used"
|
|
|
|
|
|
class PubSubDirection(Enum):
|
|
PUBLISH = "publish"
|
|
RECEIVE = "receive"
|
|
|
|
|
|
class CSCResult(Enum):
|
|
HIT = "hit"
|
|
MISS = "miss"
|
|
|
|
|
|
class CSCReason(Enum):
|
|
FULL = "full"
|
|
INVALIDATION = "invalidation"
|
|
|
|
|
|
class GeoFailoverReason(Enum):
|
|
AUTOMATIC = "automatic"
|
|
MANUAL = "manual"
|
|
|
|
|
|
class AttributeBuilder:
|
|
"""
|
|
Helper class to build OTel semantic convention attributes for Redis operations.
|
|
"""
|
|
|
|
@staticmethod
|
|
def build_base_attributes(
|
|
server_address: Optional[str] = None,
|
|
server_port: Optional[int] = None,
|
|
db_namespace: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build base attributes common to all Redis operations.
|
|
|
|
Args:
|
|
server_address: Redis server address (FQDN or IP)
|
|
server_port: Redis server port
|
|
db_namespace: Redis database index
|
|
|
|
Returns:
|
|
Dictionary of base attributes
|
|
"""
|
|
attrs: Dict[str, Any] = {
|
|
DB_SYSTEM: "redis",
|
|
REDIS_CLIENT_LIBRARY: f"redis-py:v{redis.__version__}",
|
|
}
|
|
|
|
if server_address is not None:
|
|
attrs[SERVER_ADDRESS] = server_address
|
|
|
|
if server_port is not None:
|
|
attrs[SERVER_PORT] = server_port
|
|
|
|
if db_namespace is not None:
|
|
attrs[DB_NAMESPACE] = str(db_namespace)
|
|
|
|
return attrs
|
|
|
|
@staticmethod
|
|
def build_operation_attributes(
|
|
command_name: Optional[str] = None,
|
|
batch_size: Optional[int] = None, # noqa
|
|
network_peer_address: Optional[str] = None,
|
|
network_peer_port: Optional[int] = None,
|
|
stored_procedure_name: Optional[str] = None,
|
|
retry_attempts: Optional[int] = None,
|
|
is_blocking: Optional[bool] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build attributes for a Redis operation (command execution).
|
|
|
|
Args:
|
|
command_name: Redis command name (e.g., 'GET', 'SET', 'MULTI')
|
|
batch_size: Number of commands in batch (for pipelines/transactions)
|
|
network_peer_address: Resolved peer address
|
|
network_peer_port: Peer port number
|
|
stored_procedure_name: Lua script name or SHA1 digest
|
|
retry_attempts: Number of retry attempts made
|
|
is_blocking: Whether the operation is a blocking command
|
|
|
|
Returns:
|
|
Dictionary of operation attributes
|
|
"""
|
|
attrs: Dict[str, Any] = {}
|
|
|
|
if command_name is not None:
|
|
attrs[DB_OPERATION_NAME] = command_name.upper()
|
|
|
|
if network_peer_address is not None:
|
|
attrs[NETWORK_PEER_ADDRESS] = network_peer_address
|
|
|
|
if network_peer_port is not None:
|
|
attrs[NETWORK_PEER_PORT] = network_peer_port
|
|
|
|
if stored_procedure_name is not None:
|
|
attrs[DB_STORED_PROCEDURE_NAME] = stored_procedure_name
|
|
|
|
if retry_attempts is not None and retry_attempts > 0:
|
|
attrs[REDIS_CLIENT_OPERATION_RETRY_ATTEMPTS] = retry_attempts
|
|
|
|
if is_blocking is not None:
|
|
attrs[REDIS_CLIENT_OPERATION_BLOCKING] = is_blocking
|
|
|
|
return attrs
|
|
|
|
@staticmethod
|
|
def build_connection_attributes(
|
|
pool_name: Optional[str] = None,
|
|
connection_state: Optional[ConnectionState] = None,
|
|
connection_name: Optional[str] = None,
|
|
is_pubsub: Optional[bool] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build attributes for connection pool metrics.
|
|
|
|
Args:
|
|
pool_name: Unique connection pool name
|
|
connection_state: Connection state ('idle' or 'used')
|
|
is_pubsub: Whether this is a PubSub connection
|
|
connection_name: Unique connection name
|
|
|
|
Returns:
|
|
Dictionary of connection pool attributes
|
|
"""
|
|
attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
|
|
|
|
if pool_name is not None:
|
|
attrs[DB_CLIENT_CONNECTION_POOL_NAME] = pool_name
|
|
|
|
if connection_state is not None:
|
|
attrs[DB_CLIENT_CONNECTION_STATE] = connection_state.value
|
|
|
|
if is_pubsub is not None:
|
|
attrs[REDIS_CLIENT_CONNECTION_PUBSUB] = is_pubsub
|
|
|
|
if connection_name is not None:
|
|
attrs[DB_CLIENT_CONNECTION_NAME] = connection_name
|
|
|
|
return attrs
|
|
|
|
@staticmethod
|
|
def build_error_attributes(
|
|
error_type: Optional[Exception] = None,
|
|
is_internal: Optional[bool] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build error attributes.
|
|
|
|
Args:
|
|
is_internal: Whether the error is internal (e.g., timeout, network error)
|
|
error_type: The exception that occurred
|
|
|
|
Returns:
|
|
Dictionary of error attributes
|
|
"""
|
|
attrs: Dict[str, Any] = {}
|
|
|
|
if error_type is not None:
|
|
attrs[ERROR_TYPE] = error_type.__class__.__name__
|
|
|
|
if (
|
|
hasattr(error_type, "status_code")
|
|
and error_type.status_code is not None
|
|
):
|
|
attrs[DB_RESPONSE_STATUS_CODE] = error_type.status_code
|
|
else:
|
|
attrs[DB_RESPONSE_STATUS_CODE] = "error"
|
|
|
|
if hasattr(error_type, "error_type") and error_type.error_type is not None:
|
|
attrs[REDIS_CLIENT_ERROR_CATEGORY] = error_type.error_type.value
|
|
else:
|
|
attrs[REDIS_CLIENT_ERROR_CATEGORY] = "other"
|
|
|
|
if is_internal is not None:
|
|
attrs[REDIS_CLIENT_ERROR_INTERNAL] = is_internal
|
|
|
|
return attrs
|
|
|
|
@staticmethod
|
|
def build_pubsub_message_attributes(
|
|
direction: PubSubDirection,
|
|
channel: Optional[str] = None,
|
|
sharded: Optional[bool] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build attributes for a PubSub message.
|
|
|
|
Args:
|
|
direction: Message direction ('publish' or 'receive')
|
|
channel: Pub/Sub channel name
|
|
sharded: True if sharded Pub/Sub channel
|
|
|
|
Returns:
|
|
Dictionary of PubSub message attributes
|
|
"""
|
|
attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
|
|
attrs[REDIS_CLIENT_PUBSUB_MESSAGE_DIRECTION] = direction.value
|
|
|
|
if channel is not None:
|
|
attrs[REDIS_CLIENT_PUBSUB_CHANNEL] = channel
|
|
|
|
if sharded is not None:
|
|
attrs[REDIS_CLIENT_PUBSUB_SHARDED] = sharded
|
|
|
|
return attrs
|
|
|
|
@staticmethod
|
|
def build_streaming_attributes(
|
|
stream_name: Optional[str] = None,
|
|
consumer_group: Optional[str] = None,
|
|
consumer_name: Optional[str] = None, # noqa
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build attributes for a streaming operation.
|
|
|
|
Args:
|
|
stream_name: Name of the stream
|
|
consumer_group: Name of the consumer group
|
|
consumer_name: Name of the consumer
|
|
|
|
Returns:
|
|
Dictionary of streaming attributes
|
|
"""
|
|
attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
|
|
|
|
if stream_name is not None:
|
|
attrs[REDIS_CLIENT_STREAM_NAME] = stream_name
|
|
|
|
if consumer_group is not None:
|
|
attrs[REDIS_CLIENT_CONSUMER_GROUP] = consumer_group
|
|
|
|
return attrs
|
|
|
|
@staticmethod
|
|
def build_csc_attributes(
|
|
pool_name: Optional[str] = None,
|
|
result: Optional[CSCResult] = None,
|
|
reason: Optional[CSCReason] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build attributes for a Client Side Caching (CSC) operation.
|
|
|
|
Args:
|
|
pool_name: Connection pool name (used only for csc_items metric)
|
|
result: CSC result ('hit' or 'miss')
|
|
reason: Reason for CSC eviction ('full' or 'invalidation')
|
|
|
|
Returns:
|
|
Dictionary of CSC attributes
|
|
"""
|
|
attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
|
|
|
|
if pool_name is not None:
|
|
attrs[DB_CLIENT_CONNECTION_POOL_NAME] = pool_name
|
|
|
|
if result is not None:
|
|
attrs[REDIS_CLIENT_CSC_RESULT] = result.value
|
|
|
|
if reason is not None:
|
|
attrs[REDIS_CLIENT_CSC_REASON] = reason.value
|
|
|
|
return attrs
|
|
|
|
@staticmethod
|
|
def build_geo_failover_attributes(
|
|
fail_from: Union["SyncDatabase", "AsyncDatabase"],
|
|
fail_to: Union["SyncDatabase", "AsyncDatabase"],
|
|
reason: GeoFailoverReason,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build attributes for a geo failover.
|
|
|
|
Args:
|
|
fail_from: Database failed from
|
|
fail_to: Database failed to
|
|
reason: Reason for the failover
|
|
|
|
Returns:
|
|
Dictionary of geo failover attributes
|
|
"""
|
|
attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
|
|
|
|
attrs[DB_CLIENT_GEOFAILOVER_FAIL_FROM] = get_db_name(fail_from)
|
|
attrs[DB_CLIENT_GEOFAILOVER_FAIL_TO] = get_db_name(fail_to)
|
|
attrs[DB_CLIENT_GEOFAILOVER_REASON] = reason.value
|
|
|
|
return attrs
|
|
|
|
@staticmethod
|
|
def build_pool_name(
|
|
server_address: str,
|
|
server_port: int,
|
|
db_namespace: int = 0,
|
|
) -> str:
|
|
"""
|
|
Build a unique connection pool name.
|
|
|
|
Args:
|
|
server_address: Redis server address
|
|
server_port: Redis server port
|
|
db_namespace: Redis database index
|
|
|
|
Returns:
|
|
Unique pool name in format "address:port/db"
|
|
"""
|
|
return f"{server_address}:{server_port}/{db_namespace}"
|
|
|
|
|
|
def get_pool_name(pool: "ConnectionPoolInterface") -> str:
|
|
"""
|
|
Get a short string representation of a connection pool for observability.
|
|
|
|
This provides a concise pool identifier suitable for use as a metric attribute,
|
|
in the format: host:port_uniqueID (matching go-redis format)
|
|
|
|
Args:
|
|
pool: Connection pool instance
|
|
|
|
Returns:
|
|
Short pool name in format "host:port_uniqueID"
|
|
|
|
Example:
|
|
>>> pool = ConnectionPool(host='localhost', port=6379, db=0)
|
|
>>> get_pool_name(pool)
|
|
'localhost:6379_a1b2c3d4'
|
|
"""
|
|
host = pool.connection_kwargs.get("host", "unknown")
|
|
port = pool.connection_kwargs.get("port", 6379)
|
|
|
|
# Get unique pool ID if available (added for observability)
|
|
pool_id = getattr(pool, "_pool_id", "")
|
|
|
|
if pool_id:
|
|
return f"{host}:{port}_{pool_id}"
|
|
else:
|
|
return f"{host}:{port}"
|
|
|
|
|
|
def get_db_name(database: Union["SyncDatabase", "AsyncDatabase"]):
|
|
"""
|
|
Get a short string representation of a database for observability.
|
|
|
|
Args:
|
|
database: Database instance
|
|
|
|
Returns:
|
|
Short database name in format "{host}:{port}/{weight}"
|
|
"""
|
|
|
|
host = database.client.get_connection_kwargs()["host"]
|
|
port = database.client.get_connection_kwargs()["port"]
|
|
weight = database.weight
|
|
|
|
return f"{host}:{port}/{weight}"
|