You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
372 lines
12 KiB
372 lines
12 KiB
"""
|
|
OpenTelemetry provider management for redis-py.
|
|
|
|
This module handles initialization and lifecycle management of OTel SDK components
|
|
including MeterProvider, TracerProvider (future), and LoggerProvider (future).
|
|
|
|
Uses a singleton pattern - initialize once globally, all Redis clients use it automatically.
|
|
|
|
Redis-py uses the global MeterProvider set by your application. Set it up before
|
|
initializing observability:
|
|
|
|
from opentelemetry import metrics
|
|
from opentelemetry.sdk.metrics import MeterProvider
|
|
|
|
provider = MeterProvider(...)
|
|
metrics.set_meter_provider(provider)
|
|
|
|
# Then initialize redis-py observability
|
|
otel = get_observability_instance()
|
|
otel.init(OTelConfig(enable_metrics=True))
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from redis.observability.config import OTelConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Optional imports - OTel SDK may not be installed
|
|
try:
|
|
from opentelemetry.sdk.metrics import MeterProvider
|
|
|
|
OTEL_AVAILABLE = True
|
|
except ImportError:
|
|
OTEL_AVAILABLE = False
|
|
MeterProvider = None
|
|
|
|
# Global singleton instance
|
|
_global_provider_manager: Optional["OTelProviderManager"] = None
|
|
|
|
|
|
class OTelProviderManager:
|
|
"""
|
|
Manages OpenTelemetry SDK providers and their lifecycle.
|
|
|
|
This class handles:
|
|
- Getting the global MeterProvider set by the application
|
|
- Configuring histogram bucket boundaries via Views
|
|
- Graceful shutdown
|
|
|
|
Args:
|
|
config: OTel configuration object
|
|
"""
|
|
|
|
def __init__(self, config: OTelConfig):
|
|
self.config = config
|
|
self._meter_provider: Optional[MeterProvider] = None
|
|
|
|
def get_meter_provider(self) -> Optional[MeterProvider]:
|
|
"""
|
|
Get the global MeterProvider set by the application.
|
|
|
|
Returns:
|
|
MeterProvider instance or None if metrics are disabled
|
|
|
|
Raises:
|
|
ImportError: If OpenTelemetry is not installed
|
|
RuntimeError: If metrics are enabled but no global MeterProvider is set
|
|
"""
|
|
if not self.config.is_enabled():
|
|
return None
|
|
|
|
# Lazy import - only import OTel when metrics are enabled
|
|
try:
|
|
from opentelemetry import metrics
|
|
from opentelemetry.metrics import NoOpMeterProvider
|
|
except ImportError:
|
|
raise ImportError(
|
|
"OpenTelemetry is not installed. Install it with:\n"
|
|
" pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-http"
|
|
)
|
|
|
|
# Get the global MeterProvider
|
|
if self._meter_provider is None:
|
|
self._meter_provider = metrics.get_meter_provider()
|
|
|
|
# Check if it's a real provider (not NoOp)
|
|
if isinstance(self._meter_provider, NoOpMeterProvider):
|
|
raise RuntimeError(
|
|
"Metrics are enabled but no global MeterProvider is configured.\n"
|
|
"\n"
|
|
"Set up OpenTelemetry before initializing redis-py observability:\n"
|
|
"\n"
|
|
" from opentelemetry import metrics\n"
|
|
" from opentelemetry.sdk.metrics import MeterProvider\n"
|
|
" from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader\n"
|
|
" from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter\n"
|
|
"\n"
|
|
" # Create exporter\n"
|
|
" exporter = OTLPMetricExporter(\n"
|
|
" endpoint='http://localhost:4318/v1/metrics'\n"
|
|
" )\n"
|
|
"\n"
|
|
" # Create reader\n"
|
|
" reader = PeriodicExportingMetricReader(\n"
|
|
" exporter=exporter,\n"
|
|
" export_interval_millis=10000\n"
|
|
" )\n"
|
|
"\n"
|
|
" # Create and set global provider\n"
|
|
" provider = MeterProvider(metric_readers=[reader])\n"
|
|
" metrics.set_meter_provider(provider)\n"
|
|
"\n"
|
|
" # Now initialize redis-py observability\n"
|
|
" from redis.observability import get_observability_instance, OTelConfig\n"
|
|
" otel = get_observability_instance()\n"
|
|
" otel.init(OTelConfig(enable_metrics=True))\n"
|
|
)
|
|
|
|
logger.info("Using global MeterProvider from application")
|
|
|
|
return self._meter_provider
|
|
|
|
def shutdown(self, timeout_millis: int = 30000) -> bool:
|
|
"""
|
|
Shutdown observability and flush any pending metrics.
|
|
|
|
Note: We don't shutdown the global MeterProvider since it's owned by the application.
|
|
We only force flush pending metrics.
|
|
|
|
Args:
|
|
timeout_millis: Maximum time to wait for flush
|
|
|
|
Returns:
|
|
True if flush was successful, False otherwise
|
|
"""
|
|
logger.debug(
|
|
"Flushing metrics before shutdown (not shutting down global MeterProvider)"
|
|
)
|
|
return self.force_flush(timeout_millis=timeout_millis)
|
|
|
|
def force_flush(self, timeout_millis: int = 30000) -> bool:
|
|
"""
|
|
Force flush any pending metrics from the global MeterProvider.
|
|
|
|
Args:
|
|
timeout_millis: Maximum time to wait for flush
|
|
|
|
Returns:
|
|
True if flush was successful, False otherwise
|
|
"""
|
|
if self._meter_provider is None:
|
|
return True
|
|
|
|
# NoOpMeterProvider doesn't have force_flush method
|
|
if not hasattr(self._meter_provider, "force_flush"):
|
|
logger.debug("MeterProvider does not support force_flush, skipping")
|
|
return True
|
|
|
|
try:
|
|
logger.debug("Force flushing metrics from global MeterProvider")
|
|
self._meter_provider.force_flush(timeout_millis=timeout_millis)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error flushing metrics: {e}")
|
|
return False
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry."""
|
|
return self
|
|
|
|
def __exit__(self, _exc_type, _exc_val, _exc_tb):
|
|
"""Context manager exit - shutdown provider."""
|
|
self.shutdown()
|
|
|
|
def __repr__(self) -> str:
|
|
return f"OTelProviderManager(config={self.config})"
|
|
|
|
|
|
# Singleton instance class
|
|
|
|
|
|
class ObservabilityInstance:
|
|
"""
|
|
Singleton instance for managing OpenTelemetry observability.
|
|
|
|
This class follows the singleton pattern similar to Glide's GetOtelInstance().
|
|
Use GetObservabilityInstance() to get the singleton instance, then call init()
|
|
to initialize observability.
|
|
|
|
Example:
|
|
>>> from redis.observability.config import OTelConfig
|
|
>>>
|
|
>>> # Get singleton instance
|
|
>>> otel = get_observability_instance()
|
|
>>>
|
|
>>> # Initialize once at app startup
|
|
>>> otel.init(OTelConfig())
|
|
>>>
|
|
>>> # All Redis clients now automatically collect metrics
|
|
>>> import redis
|
|
>>> r = redis.Redis(host='localhost', port=6379)
|
|
>>> r.set('key', 'value') # Metrics collected automatically
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._provider_manager: Optional[OTelProviderManager] = None
|
|
|
|
def init(self, config: OTelConfig) -> "ObservabilityInstance":
|
|
"""
|
|
Initialize OpenTelemetry observability globally for all Redis clients.
|
|
|
|
This should be called once at application startup. After initialization,
|
|
all Redis clients will automatically collect and export metrics without
|
|
needing any additional configuration.
|
|
|
|
Safe to call multiple times - will shutdown previous instance before
|
|
initializing a new one.
|
|
|
|
Args:
|
|
config: OTel configuration object
|
|
|
|
Returns:
|
|
Self for method chaining
|
|
|
|
Example:
|
|
>>> otel = get_observability_instance()
|
|
>>> otel.init(OTelConfig())
|
|
"""
|
|
if self._provider_manager is not None:
|
|
logger.warning(
|
|
"Observability already initialized. Shutting down previous instance."
|
|
)
|
|
self._provider_manager.shutdown()
|
|
|
|
self._provider_manager = OTelProviderManager(config)
|
|
|
|
logger.info("Observability initialized")
|
|
|
|
return self
|
|
|
|
def is_enabled(self) -> bool:
|
|
"""
|
|
Check if observability is enabled.
|
|
|
|
Returns:
|
|
True if observability is initialized and metrics are enabled
|
|
|
|
Example:
|
|
>>> otel = get_observability_instance()
|
|
>>> if otel.is_enabled():
|
|
... print("Metrics are being collected")
|
|
"""
|
|
return (
|
|
self._provider_manager is not None
|
|
and self._provider_manager.config.is_enabled()
|
|
)
|
|
|
|
def get_provider_manager(self) -> Optional[OTelProviderManager]:
|
|
"""
|
|
Get the provider manager instance.
|
|
|
|
Returns:
|
|
The provider manager, or None if not initialized
|
|
|
|
Example:
|
|
>>> otel = get_observability_instance()
|
|
>>> manager = otel.get_provider_manager()
|
|
>>> if manager is not None:
|
|
... print(f"Observability enabled: {manager.config.is_enabled()}")
|
|
"""
|
|
return self._provider_manager
|
|
|
|
def shutdown(self, timeout_millis: int = 30000) -> bool:
|
|
"""
|
|
Shutdown observability and flush any pending metrics.
|
|
|
|
This should be called at application shutdown to ensure all metrics
|
|
are exported before the application exits.
|
|
|
|
Args:
|
|
timeout_millis: Maximum time to wait for shutdown
|
|
|
|
Returns:
|
|
True if shutdown was successful
|
|
|
|
Example:
|
|
>>> otel = get_observability_instance()
|
|
>>> # At application shutdown
|
|
>>> otel.shutdown()
|
|
"""
|
|
if self._provider_manager is None:
|
|
logger.debug("Observability not initialized, nothing to shutdown")
|
|
return True
|
|
|
|
success = self._provider_manager.shutdown(timeout_millis)
|
|
self._provider_manager = None
|
|
logger.info("Observability shutdown")
|
|
|
|
return success
|
|
|
|
def force_flush(self, timeout_millis: int = 30000) -> bool:
|
|
"""
|
|
Force flush all pending metrics immediately.
|
|
|
|
Useful for testing or when you want to ensure metrics are exported
|
|
before a specific point in your application.
|
|
|
|
Args:
|
|
timeout_millis: Maximum time to wait for flush
|
|
|
|
Returns:
|
|
True if flush was successful
|
|
|
|
Example:
|
|
>>> otel = get_observability_instance()
|
|
>>> # Execute some Redis commands
|
|
>>> r.set('key', 'value')
|
|
>>> # Force flush metrics immediately
|
|
>>> otel.force_flush()
|
|
"""
|
|
if self._provider_manager is None:
|
|
logger.debug("Observability not initialized, nothing to flush")
|
|
return True
|
|
|
|
return self._provider_manager.force_flush(timeout_millis)
|
|
|
|
|
|
# Global singleton instance
|
|
_observability_instance: Optional[ObservabilityInstance] = None
|
|
|
|
|
|
def get_observability_instance() -> ObservabilityInstance:
|
|
"""
|
|
Get the global observability singleton instance.
|
|
|
|
This is the Pythonic way to get the singleton instance.
|
|
|
|
Returns:
|
|
The global ObservabilityInstance singleton
|
|
|
|
Example:
|
|
>>>
|
|
>>> otel = get_observability_instance()
|
|
>>> otel.init(OTelConfig())
|
|
"""
|
|
global _observability_instance
|
|
|
|
if _observability_instance is None:
|
|
_observability_instance = ObservabilityInstance()
|
|
|
|
return _observability_instance
|
|
|
|
|
|
def reset_observability_instance() -> None:
|
|
"""
|
|
Reset the global observability singleton instance.
|
|
|
|
This is primarily used for testing and benchmarking to ensure
|
|
a clean state between test runs.
|
|
|
|
Warning:
|
|
This will shutdown any active provider manager and reset
|
|
the global state. Use with caution in production code.
|
|
"""
|
|
global _observability_instance
|
|
|
|
if _observability_instance is not None:
|
|
_observability_instance.shutdown()
|
|
_observability_instance = None
|