From 7a345ab467f16390693b572d2fc038ab9b29049e Mon Sep 17 00:00:00 2001 From: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> Date: Wed, 3 Dec 2025 10:24:05 +0200 Subject: [PATCH] Revert PR #3634 (Observability metrics phase1) (#3635) --- example/otel_metrics.go | 88 -------- extra/redisotel-native/config.go | 69 ------ extra/redisotel-native/go.mod | 27 --- extra/redisotel-native/go.sum | 41 ---- extra/redisotel-native/metrics.go | 326 ---------------------------- extra/redisotel-native/redisotel.go | 196 ----------------- internal/otel/metrics.go | 64 ------ internal/pool/pool.go | 59 ----- otel.go | 69 ------ redis.go | 30 +-- 10 files changed, 5 insertions(+), 964 deletions(-) delete mode 100644 example/otel_metrics.go delete mode 100644 extra/redisotel-native/config.go delete mode 100644 extra/redisotel-native/go.mod delete mode 100644 extra/redisotel-native/go.sum delete mode 100644 extra/redisotel-native/metrics.go delete mode 100644 extra/redisotel-native/redisotel.go delete mode 100644 internal/otel/metrics.go delete mode 100644 otel.go diff --git a/example/otel_metrics.go b/example/otel_metrics.go deleted file mode 100644 index adf5de51..00000000 --- a/example/otel_metrics.go +++ /dev/null @@ -1,88 +0,0 @@ -// EXAMPLE: otel_metrics -// HIDE_START -package main - -import ( - "context" - "log" - "strconv" - "time" - - redisotel "github.com/redis/go-redis/extra/redisotel-native/v9" - "github.com/redis/go-redis/v9" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - "go.opentelemetry.io/otel/sdk/metric" -) - -// ExampleClient_otel_metrics demonstrates how to enable OpenTelemetry metrics -// for Redis operations and export them to an OTLP collector. -func main() { - ctx := context.Background() - - // HIDE_END - - // STEP_START otel_exporter_setup - // Create OTLP exporter that sends metrics to the collector - // Default endpoint is localhost:4317 (gRPC) - exporter, err := otlpmetricgrpc.New(ctx, - otlpmetricgrpc.WithInsecure(), // Use insecure for local development - // For production, configure TLS and authentication: - // otlpmetricgrpc.WithEndpoint("your-collector:4317"), - // otlpmetricgrpc.WithTLSCredentials(...), - ) - if err != nil { - log.Fatalf("Failed to create OTLP exporter: %v", err) - } - // STEP_END - - // STEP_START otel_meter_provider - // Create meter provider with periodic reader - // Metrics are exported every 10 seconds - meterProvider := metric.NewMeterProvider( - metric.WithReader( - metric.NewPeriodicReader(exporter, - metric.WithInterval(10*time.Second), - ), - ), - ) - defer func() { - if err := meterProvider.Shutdown(ctx); err != nil { - log.Printf("Error shutting down meter provider: %v", err) - } - }() - - // Set the global meter provider - otel.SetMeterProvider(meterProvider) - // STEP_END - - // STEP_START redis_client_setup - // Create Redis client - rdb := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - }) - defer rdb.Close() - - // Initialize OTel instrumentation (uses global meter provider) - if err := redisotel.Init(rdb); err != nil { - log.Fatalf("Failed to initialize OTel: %v", err) - } - defer redisotel.Shutdown() - // STEP_END - - // STEP_START redis_operations - // Execute Redis operations - metrics are automatically collected - log.Println("Executing Redis operations...") - for i := range 100 { - if err := rdb.Set(ctx, "key"+strconv.Itoa(i), "value", 0).Err(); err != nil { - log.Printf("Error setting key: %v", err) - } - time.Sleep(time.Millisecond * 100) - } - log.Println("Operations complete. Waiting for metrics to be exported...") - - // Wait for metrics to be exported - time.Sleep(15 * time.Second) - // STEP_END -} - diff --git a/extra/redisotel-native/config.go b/extra/redisotel-native/config.go deleted file mode 100644 index 63397d8a..00000000 --- a/extra/redisotel-native/config.go +++ /dev/null @@ -1,69 +0,0 @@ -package redisotel - -import ( - "go.opentelemetry.io/otel/metric" -) - -// config holds the configuration for the instrumentation -type config struct { - meterProvider metric.MeterProvider - histogramBuckets []float64 -} - -// defaultConfig returns the default configuration -func defaultConfig() config { - return config{ - meterProvider: nil, // Will use global otel.GetMeterProvider() if nil - histogramBuckets: defaultHistogramBuckets(), - } -} - -// defaultHistogramBuckets returns the default histogram buckets for operation duration -// These buckets are designed to capture typical Redis operation latencies: -// - Sub-millisecond: 0.0001s (0.1ms), 0.0005s (0.5ms) -// - Milliseconds: 0.001s (1ms), 0.005s (5ms), 0.01s (10ms), 0.05s (50ms), 0.1s (100ms) -// - Sub-second: 0.5s (500ms) -// - Seconds: 1s, 5s, 10s -func defaultHistogramBuckets() []float64 { - return []float64{ - 0.0001, // 0.1ms - 0.0005, // 0.5ms - 0.001, // 1ms - 0.005, // 5ms - 0.01, // 10ms - 0.05, // 50ms - 0.1, // 100ms - 0.5, // 500ms - 1.0, // 1s - 5.0, // 5s - 10.0, // 10s - } -} - -// Option is a functional option for configuring the instrumentation -type Option interface { - apply(*config) -} - -// optionFunc wraps a function to implement the Option interface -type optionFunc func(*config) - -func (f optionFunc) apply(c *config) { - f(c) -} - -// WithMeterProvider sets the meter provider to use for creating metrics. -// If not provided, the global meter provider from otel.GetMeterProvider() will be used. -func WithMeterProvider(provider metric.MeterProvider) Option { - return optionFunc(func(c *config) { - c.meterProvider = provider - }) -} - -// WithHistogramBuckets sets custom histogram buckets for operation duration -// Buckets should be in seconds and in ascending order -func WithHistogramBuckets(buckets []float64) Option { - return optionFunc(func(c *config) { - c.histogramBuckets = buckets - }) -} diff --git a/extra/redisotel-native/go.mod b/extra/redisotel-native/go.mod deleted file mode 100644 index a814a31f..00000000 --- a/extra/redisotel-native/go.mod +++ /dev/null @@ -1,27 +0,0 @@ -module github.com/redis/go-redis/extra/redisotel-native/v9 - -go 1.23.0 - -toolchain go1.24.2 - -replace github.com/redis/go-redis/v9 => ../.. - -require ( - github.com/redis/go-redis/v9 v9.7.0 - go.opentelemetry.io/otel v1.38.0 - go.opentelemetry.io/otel/metric v1.38.0 - go.opentelemetry.io/otel/sdk/metric v1.38.0 -) - -require ( - github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/go-logr/logr v1.4.3 // indirect - github.com/go-logr/stdr v1.2.2 // indirect - github.com/google/uuid v1.6.0 // indirect - go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 // indirect - go.opentelemetry.io/otel/sdk v1.38.0 // indirect - go.opentelemetry.io/otel/trace v1.38.0 // indirect - golang.org/x/sys v0.35.0 // indirect -) diff --git a/extra/redisotel-native/go.sum b/extra/redisotel-native/go.sum deleted file mode 100644 index afd7fd46..00000000 --- a/extra/redisotel-native/go.sum +++ /dev/null @@ -1,41 +0,0 @@ -github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= -github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= -github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= -github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= -github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= -github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= -github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= -github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= -go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= -go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= -go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= -go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 h1:wm/Q0GAAykXv83wzcKzGGqAnnfLFyFe7RslekZuv+VI= -go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0/go.mod h1:ra3Pa40+oKjvYh+ZD3EdxFZZB0xdMfuileHAm4nNN7w= -go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= -go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= -go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= -go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= -go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= -go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= -go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= -go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= -golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= -golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/extra/redisotel-native/metrics.go b/extra/redisotel-native/metrics.go deleted file mode 100644 index edf12273..00000000 --- a/extra/redisotel-native/metrics.go +++ /dev/null @@ -1,326 +0,0 @@ -package redisotel - -import ( - "context" - "fmt" - "net" - "strconv" - "strings" - "time" - - "github.com/redis/go-redis/v9" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" -) - -const ( - // Library name for redis.client.library attribute - libraryName = "go-redis" -) - -// metricsRecorder implements the otel.Recorder interface -type metricsRecorder struct { - operationDuration metric.Float64Histogram - connectionCount metric.Int64UpDownCounter - - // Client configuration for attributes (used for operation metrics only) - serverAddr string - serverPort string - dbIndex string -} - -// RecordOperationDuration records db.client.operation.duration metric -func (r *metricsRecorder) RecordOperationDuration( - ctx context.Context, - duration time.Duration, - cmd redis.Cmder, - attempts int, - cn redis.ConnInfo, -) { - if r.operationDuration == nil { - return - } - - // Convert duration to seconds (OTel convention for duration metrics) - durationSeconds := duration.Seconds() - - // Build attributes - attrs := []attribute.KeyValue{ - // Required attributes - attribute.String("db.operation.name", cmd.FullName()), - attribute.String("redis.client.library", fmt.Sprintf("%s:%s", libraryName, redis.Version())), - attribute.Int("redis.client.operation.retry_attempts", attempts-1), // attempts-1 = retry count - attribute.Bool("redis.client.operation.blocking", isBlockingCommand(cmd)), - - // Recommended attributes - attribute.String("db.system", "redis"), - attribute.String("server.address", r.serverAddr), - } - - // Add server.port if not default - if r.serverPort != "" && r.serverPort != "6379" { - attrs = append(attrs, attribute.String("server.port", r.serverPort)) - } - - // Add db.namespace (database index) if available - if r.dbIndex != "" { - attrs = append(attrs, attribute.String("db.namespace", r.dbIndex)) - } - - // Add network.peer.address and network.peer.port from connection - if cn != nil { - remoteAddr := cn.RemoteAddr() - if remoteAddr != nil { - peerAddr, peerPort := splitHostPort(remoteAddr.String()) - if peerAddr != "" { - attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) - } - if peerPort != "" { - attrs = append(attrs, attribute.String("network.peer.port", peerPort)) - } - } - } - - // Add error.type if command failed - if err := cmd.Err(); err != nil { - attrs = append(attrs, attribute.String("error.type", classifyError(err))) - } - - // Add db.response.status_code if error is a Redis error - if err := cmd.Err(); err != nil { - if statusCode := extractRedisErrorPrefix(err); statusCode != "" { - attrs = append(attrs, attribute.String("db.response.status_code", statusCode)) - } - } - - // Record the histogram - r.operationDuration.Record(ctx, durationSeconds, metric.WithAttributes(attrs...)) -} - -// isBlockingCommand checks if a command is a blocking operation -// Blocking commands have a timeout parameter and include: BLPOP, BRPOP, BRPOPLPUSH, BLMOVE, -// BZPOPMIN, BZPOPMAX, BZMPOP, BLMPOP, XREAD with BLOCK, XREADGROUP with BLOCK -func isBlockingCommand(cmd redis.Cmder) bool { - name := strings.ToLower(cmd.Name()) - - // Commands that start with 'b' and are blocking - if strings.HasPrefix(name, "b") { - switch name { - case "blpop", "brpop", "brpoplpush", "blmove", "bzpopmin", "bzpopmax", "bzmpop", "blmpop": - return true - } - } - - // XREAD and XREADGROUP with BLOCK option - if name == "xread" || name == "xreadgroup" { - args := cmd.Args() - for i, arg := range args { - if argStr, ok := arg.(string); ok { - if strings.ToLower(argStr) == "block" && i+1 < len(args) { - return true - } - } - } - } - - return false -} - -// classifyError returns the error.type attribute value -// Format: :: -func classifyError(err error) string { - if err == nil { - return "" - } - - errStr := err.Error() - - // Network errors - if isNetworkError(err) { - return fmt.Sprintf("network:%s", errStr) - } - - // Timeout errors - if isTimeoutError(err) { - return "timeout" - } - - // Redis errors (start with error prefix like ERR, WRONGTYPE, etc.) - if prefix := extractRedisErrorPrefix(err); prefix != "" { - return fmt.Sprintf("redis:%s", prefix) - } - - // Generic error - return errStr -} - -// extractRedisErrorPrefix extracts the Redis error prefix (e.g., "ERR", "WRONGTYPE") -// Redis errors typically start with an uppercase prefix followed by a space -func extractRedisErrorPrefix(err error) string { - if err == nil { - return "" - } - - errStr := err.Error() - - // Redis errors typically start with an uppercase prefix - // Examples: "ERR ...", "WRONGTYPE ...", "CLUSTERDOWN ..." - parts := strings.SplitN(errStr, " ", 2) - if len(parts) > 0 { - prefix := parts[0] - // Check if it's all uppercase (Redis error convention) - if prefix == strings.ToUpper(prefix) && len(prefix) > 0 { - return prefix - } - } - - return "" -} - -// isNetworkError checks if an error is a network-related error -func isNetworkError(err error) bool { - if err == nil { - return false - } - - // Check for net.Error interface - if _, ok := err.(net.Error); ok { - return true - } - - // Check error message for common network error patterns - errStr := strings.ToLower(err.Error()) - networkPatterns := []string{ - "connection refused", - "connection reset", - "broken pipe", - "no route to host", - "network is unreachable", - "i/o timeout", - "eof", - } - - for _, pattern := range networkPatterns { - if strings.Contains(errStr, pattern) { - return true - } - } - - return false -} - -// isTimeoutError checks if an error is a timeout error -func isTimeoutError(err error) bool { - if err == nil { - return false - } - - // Check for net.Error with Timeout() method - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - return true - } - - // Check error message - errStr := strings.ToLower(err.Error()) - return strings.Contains(errStr, "timeout") || strings.Contains(errStr, "deadline exceeded") -} - -// splitHostPort splits a host:port string into host and port -func splitHostPort(addr string) (host, port string) { - // Handle Unix sockets - if strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, "@") { - return addr, "" - } - - host, port, err := net.SplitHostPort(addr) - if err != nil { - // If split fails, return the whole address as host - return addr, "" - } - - return host, port -} - -// parseAddr parses a Redis address into host and port -func parseAddr(addr string) (host, port string) { - // Handle Unix sockets - if strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, "unix://") { - return addr, "" - } - - // Remove protocol prefix if present - addr = strings.TrimPrefix(addr, "redis://") - addr = strings.TrimPrefix(addr, "rediss://") - - host, port, err := net.SplitHostPort(addr) - if err != nil { - // No port specified, use default - return addr, "6379" - } - - return host, port -} - -// formatDBIndex formats the database index as a string -func formatDBIndex(db int) string { - if db < 0 { - return "" - } - return strconv.Itoa(db) -} - -// RecordConnectionStateChange records a change in connection state -// This is called from the pool when connections transition between states -func (r *metricsRecorder) RecordConnectionStateChange( - ctx context.Context, - cn redis.ConnInfo, - fromState, toState string, -) { - if r.connectionCount == nil { - return - } - - // Extract server address from connection - serverAddr, serverPort := extractServerInfo(cn) - - // Build base attributes - attrs := []attribute.KeyValue{ - attribute.String("db.system", "redis"), - attribute.String("server.address", serverAddr), - } - - // Add server.port if not default - if serverPort != "" && serverPort != "6379" { - attrs = append(attrs, attribute.String("server.port", serverPort)) - } - - // Decrement old state (if not empty) - if fromState != "" { - fromAttrs := append([]attribute.KeyValue{}, attrs...) - fromAttrs = append(fromAttrs, attribute.String("state", fromState)) - r.connectionCount.Add(ctx, -1, metric.WithAttributes(fromAttrs...)) - } - - // Increment new state - if toState != "" { - toAttrs := append([]attribute.KeyValue{}, attrs...) - toAttrs = append(toAttrs, attribute.String("state", toState)) - r.connectionCount.Add(ctx, 1, metric.WithAttributes(toAttrs...)) - } -} - -// extractServerInfo extracts server address and port from connection info -func extractServerInfo(cn redis.ConnInfo) (addr, port string) { - if cn == nil { - return "", "" - } - - remoteAddr := cn.RemoteAddr() - if remoteAddr == nil { - return "", "" - } - - addrStr := remoteAddr.String() - host, portStr := parseAddr(addrStr) - return host, portStr -} diff --git a/extra/redisotel-native/redisotel.go b/extra/redisotel-native/redisotel.go deleted file mode 100644 index fa45c069..00000000 --- a/extra/redisotel-native/redisotel.go +++ /dev/null @@ -1,196 +0,0 @@ -// Package redisotel provides native OpenTelemetry instrumentation for go-redis. -// -// This package implements the OpenTelemetry Semantic Conventions for database clients, -// providing metrics, traces, and logs for Redis operations. -// -// Basic Usage (with global MeterProvider): -// -// import ( -// "github.com/redis/go-redis/v9" -// redisotel "github.com/redis/go-redis/extra/redisotel-native/v9" -// "go.opentelemetry.io/otel" -// ) -// -// func main() { -// // Initialize OpenTelemetry globally (meter provider, etc.) -// otel.SetMeterProvider(myMeterProvider) -// -// // Create Redis client -// rdb := redis.NewClient(&redis.Options{ -// Addr: "localhost:6379", -// DB: 0, -// }) -// -// // Initialize native OTel instrumentation (uses global MeterProvider) -// if err := redisotel.Init(rdb); err != nil { -// panic(err) -// } -// -// // Use the client normally - metrics are automatically recorded -// rdb.Set(ctx, "key", "value", 0) -// } -// -// Advanced Usage (with custom MeterProvider): -// -// // Pass a custom MeterProvider -// if err := redisotel.Init(rdb, redisotel.WithMeterProvider(customProvider)); err != nil { -// panic(err) -// } -package redisotel - -import ( - "fmt" - "sync" - - "github.com/redis/go-redis/v9" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric" -) - -var ( - // Global singleton instance - globalInstance *metricsRecorder - globalInstanceOnce sync.Once - initErr error -) - -// Init initializes native OpenTelemetry instrumentation for the given Redis client. -// This function should be called once per application, typically during startup. -// Subsequent calls are no-ops and return nil. -// -// The function extracts configuration from the client (server address, port, database index) -// and registers a global metrics recorder. -// -// If no MeterProvider is provided via WithMeterProvider option, the global MeterProvider -// from otel.GetMeterProvider() will be used. Make sure to call otel.SetMeterProvider() -// before calling Init() if you want to use a custom provider. -// -// Example (using global MeterProvider): -// -// otel.SetMeterProvider(myMeterProvider) -// rdb := redis.NewClient(&redis.Options{ -// Addr: "localhost:6379", -// DB: 0, -// }) -// if err := redisotel.Init(rdb); err != nil { -// log.Fatal(err) -// } -// -// Example (using custom MeterProvider): -// -// if err := redisotel.Init(rdb, redisotel.WithMeterProvider(customProvider)); err != nil { -// log.Fatal(err) -// } -func Init(client redis.UniversalClient, opts ...Option) error { - globalInstanceOnce.Do(func() { - initErr = initOnce(client, opts...) - }) - return initErr -} - -// initOnce performs the actual initialization (called once by sync.Once) -func initOnce(client redis.UniversalClient, opts ...Option) error { - // Apply options - cfg := defaultConfig() - for _, opt := range opts { - opt.apply(&cfg) - } - - // Extract client configuration - serverAddr, serverPort, dbIndex, err := extractClientConfig(client) - if err != nil { - return fmt.Errorf("failed to extract client config: %w", err) - } - - // Get meter provider (use global if not provided) - meterProvider := cfg.meterProvider - if meterProvider == nil { - meterProvider = otel.GetMeterProvider() - } - - // Create meter - meter := meterProvider.Meter( - "github.com/redis/go-redis", - metric.WithInstrumentationVersion(redis.Version()), - ) - - // Create histogram for operation duration - operationDuration, err := meter.Float64Histogram( - "db.client.operation.duration", - metric.WithDescription("Duration of database client operations"), - metric.WithUnit("s"), - metric.WithExplicitBucketBoundaries(cfg.histogramBuckets...), - ) - if err != nil { - return fmt.Errorf("failed to create operation duration histogram: %w", err) - } - - // Create synchronous UpDownCounter for connection count - connectionCount, err := meter.Int64UpDownCounter( - "db.client.connection.count", - metric.WithDescription("The number of connections that are currently in state described by the state attribute"), - metric.WithUnit("{connection}"), - ) - if err != nil { - return fmt.Errorf("failed to create connection count metric: %w", err) - } - - // Create recorder - recorder := &metricsRecorder{ - operationDuration: operationDuration, - connectionCount: connectionCount, - serverAddr: serverAddr, - serverPort: serverPort, - dbIndex: dbIndex, - } - - // Register global recorder - redis.SetOTelRecorder(recorder) - globalInstance = recorder - - return nil -} - -// extractClientConfig extracts server address, port, and database index from a Redis client -func extractClientConfig(client redis.UniversalClient) (serverAddr, serverPort, dbIndex string, err error) { - switch c := client.(type) { - case *redis.Client: - opts := c.Options() - host, port := parseAddr(opts.Addr) - return host, port, formatDBIndex(opts.DB), nil - - case *redis.ClusterClient: - opts := c.Options() - if len(opts.Addrs) > 0 { - // Use first address for server.address attribute - host, port := parseAddr(opts.Addrs[0]) - return host, port, "", nil - } - return "", "", "", fmt.Errorf("cluster client has no addresses") - - case *redis.Ring: - opts := c.Options() - if len(opts.Addrs) > 0 { - // Use first address for server.address attribute - for _, addr := range opts.Addrs { - host, port := parseAddr(addr) - return host, port, formatDBIndex(opts.DB), nil - } - } - return "", "", "", fmt.Errorf("ring client has no addresses") - - default: - return "", "", "", fmt.Errorf("unsupported client type: %T", client) - } -} - -// Shutdown cleans up resources (for testing purposes) -func Shutdown() { - if globalInstance != nil { - redis.SetOTelRecorder(nil) - globalInstance = nil - } - // Reset the sync.Once so Init can be called again (useful for tests) - globalInstanceOnce = sync.Once{} - initErr = nil -} diff --git a/internal/otel/metrics.go b/internal/otel/metrics.go deleted file mode 100644 index 9755c572..00000000 --- a/internal/otel/metrics.go +++ /dev/null @@ -1,64 +0,0 @@ -package otel - -import ( - "context" - "time" - - "github.com/redis/go-redis/v9/internal/pool" -) - -// Cmder is a minimal interface for command information needed for metrics. -// This avoids circular dependencies with the main redis package. -type Cmder interface { - Name() string - FullName() string - Args() []interface{} - Err() error -} - -// Recorder is the interface for recording metrics. -// Implementations are provided by extra/redisotel-native package. -type Recorder interface { - // RecordOperationDuration records the total operation duration (including all retries) - RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn) - - // RecordConnectionStateChange records when a connection changes state - RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) -} - -// Global recorder instance (initialized by extra/redisotel-native) -var globalRecorder Recorder = noopRecorder{} - -// SetGlobalRecorder sets the global recorder (called by Init() in extra/redisotel-native) -func SetGlobalRecorder(r Recorder) { - if r == nil { - globalRecorder = noopRecorder{} - // Unregister pool callback - pool.SetConnectionStateChangeCallback(nil) - return - } - globalRecorder = r - - // Register pool callback to forward state changes to recorder - pool.SetConnectionStateChangeCallback(func(ctx context.Context, cn *pool.Conn, fromState, toState string) { - globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState) - }) -} - -// RecordOperationDuration records the total operation duration. -// This is called from redis.go after command execution completes. -func RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn) { - globalRecorder.RecordOperationDuration(ctx, duration, cmd, attempts, cn) -} - -// RecordConnectionStateChange records when a connection changes state. -// This is called from pool.go when connections transition between states. -func RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) { - globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState) -} - -// noopRecorder is a no-op implementation (zero overhead when metrics disabled) -type noopRecorder struct{} - -func (noopRecorder) RecordOperationDuration(context.Context, time.Duration, Cmder, int, *pool.Conn) {} -func (noopRecorder) RecordConnectionStateChange(context.Context, *pool.Conn, string, string) {} diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 2369f363..d757d1f4 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -24,7 +24,6 @@ var ( // ErrPoolTimeout timed out waiting to get a connection from the connection pool. ErrPoolTimeout = errors.New("redis: connection pool timeout") -<<<<<<< HEAD // ErrConnUnusableTimeout is returned when a connection is not usable and we timed out trying to mark it as unusable. ErrConnUnusableTimeout = errors.New("redis: timed out trying to mark connection as unusable") @@ -33,10 +32,6 @@ var ( // errConnNotPooled is returned when trying to return a non-pooled connection to the pool. errConnNotPooled = errors.New("connection not pooled") -======= - // Global callback for connection state changes (set by otel package) - connectionStateChangeCallback func(ctx context.Context, cn *Conn, fromState, toState string) ->>>>>>> c17657c6 (Adds connection state metrics) // popAttempts is the maximum number of attempts to find a usable connection // when popping from the idle connection pool. This handles cases where connections @@ -56,23 +51,6 @@ var ( noExpiration = maxTime ) -<<<<<<< HEAD -======= -// SetConnectionStateChangeCallback sets the global callback for connection state changes. -// This is called by the otel package to register metrics recording. -func SetConnectionStateChangeCallback(fn func(ctx context.Context, cn *Conn, fromState, toState string)) { - connectionStateChangeCallback = fn -} - -var timers = sync.Pool{ - New: func() interface{} { - t := time.NewTimer(time.Hour) - t.Stop() - return t - }, -} - ->>>>>>> c17657c6 (Adds connection state metrics) // Stats contains pool state information and accumulated stats. type Stats struct { Hits uint32 // number of times free connection was found in the pool @@ -546,12 +524,6 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { } atomic.AddUint32(&p.stats.Hits, 1) - - // Notify metrics: connection moved from idle to used - if connectionStateChangeCallback != nil { - connectionStateChangeCallback(ctx, cn, "idle", "used") - } - return cn, nil } @@ -574,12 +546,6 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { return nil, err } } - - // Notify metrics: new connection is created and used - if connectionStateChangeCallback != nil { - connectionStateChangeCallback(ctx, newcn, "", "used") - } - return newcn, nil } @@ -874,26 +840,9 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) { p.connsMu.Unlock() p.idleConnsLen.Add(1) } -<<<<<<< HEAD -======= - p.idleConnsLen.Add(1) - - // Notify metrics: connection moved from used to idle - if connectionStateChangeCallback != nil { - connectionStateChangeCallback(ctx, cn, "used", "idle") - } ->>>>>>> c17657c6 (Adds connection state metrics) } else { shouldCloseConn = true -<<<<<<< HEAD p.removeConnWithLock(cn) -======= - - // Notify metrics: connection removed (used -> nothing) - if connectionStateChangeCallback != nil { - connectionStateChangeCallback(ctx, cn, "used", "") - } ->>>>>>> c17657c6 (Adds connection state metrics) } if freeTurn { @@ -908,7 +857,6 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) { } func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) { -<<<<<<< HEAD p.removeConnInternal(ctx, cn, reason, true) } @@ -929,19 +877,12 @@ func (p *ConnPool) removeConnInternal(ctx context.Context, cn *Conn, reason erro hookManager.ProcessOnRemove(ctx, cn, reason) } -======= ->>>>>>> c17657c6 (Adds connection state metrics) p.removeConnWithLock(cn) if freeTurn { p.freeTurn() } - // Notify metrics: connection removed (assume from used state) - if connectionStateChangeCallback != nil { - connectionStateChangeCallback(ctx, cn, "used", "") - } - _ = p.closeConn(cn) // Check if we need to create new idle connections to maintain MinIdleConns diff --git a/otel.go b/otel.go deleted file mode 100644 index aa2f4028..00000000 --- a/otel.go +++ /dev/null @@ -1,69 +0,0 @@ -package redis - -import ( - "context" - "net" - "time" - - "github.com/redis/go-redis/v9/internal/otel" - "github.com/redis/go-redis/v9/internal/pool" -) - -// ConnInfo provides information about a Redis connection for metrics. -// This is a public interface to avoid exposing internal types. -type ConnInfo interface { - // RemoteAddr returns the remote network address - RemoteAddr() net.Addr -} - -// OTelRecorder is the interface for recording OpenTelemetry metrics. -// Implementations are provided by extra/redisotel-native package. -// -// This interface is exported to allow external packages to implement -// custom recorders without depending on internal packages. -type OTelRecorder interface { - // RecordOperationDuration records the total operation duration (including all retries) - RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn ConnInfo) - - // RecordConnectionStateChange records when a connection changes state (e.g., idle -> used) - RecordConnectionStateChange(ctx context.Context, cn ConnInfo, fromState, toState string) -} - -// SetOTelRecorder sets the global OpenTelemetry recorder. -// This is typically called by Init() in extra/redisotel-native package. -// -// Setting a nil recorder disables metrics collection. -func SetOTelRecorder(r OTelRecorder) { - if r == nil { - otel.SetGlobalRecorder(nil) - return - } - otel.SetGlobalRecorder(&otelRecorderAdapter{r}) -} - -// otelRecorderAdapter adapts the public OTelRecorder interface to the internal otel.Recorder interface -type otelRecorderAdapter struct { - recorder OTelRecorder -} - -func (a *otelRecorderAdapter) RecordOperationDuration(ctx context.Context, duration time.Duration, cmd otel.Cmder, attempts int, cn *pool.Conn) { - // Convert internal Cmder to public Cmder - if publicCmd, ok := cmd.(Cmder); ok { - // Convert internal pool.Conn to public ConnInfo - var connInfo ConnInfo - if cn != nil { - connInfo = cn - } - a.recorder.RecordOperationDuration(ctx, duration, publicCmd, attempts, connInfo) - } -} - -func (a *otelRecorderAdapter) RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) { - // Convert internal pool.Conn to public ConnInfo - var connInfo ConnInfo - if cn != nil { - connInfo = cn - } - a.recorder.RecordConnectionStateChange(ctx, connInfo, fromState, toState) -} - diff --git a/redis.go b/redis.go index 4d6ed563..a6a71067 100644 --- a/redis.go +++ b/redis.go @@ -13,7 +13,6 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/auth/streaming" "github.com/redis/go-redis/v9/internal/hscan" - "github.com/redis/go-redis/v9/internal/otel" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/maintnotifications" @@ -663,34 +662,17 @@ func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn, } func (c *baseClient) process(ctx context.Context, cmd Cmder) error { - // Start measuring total operation duration (includes all retries) - operationStart := time.Now() - var lastConn *pool.Conn - var lastErr error - totalAttempts := 0 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { - totalAttempts++ attempt := attempt - retry, err, cn := c._process(ctx, cmd, attempt) - if cn != nil { - lastConn = cn - } + retry, err := c._process(ctx, cmd, attempt) if err == nil || !retry { - // Record total operation duration - operationDuration := time.Since(operationStart) - otel.RecordOperationDuration(ctx, operationDuration, cmd, totalAttempts, lastConn) return err } lastErr = err } - - // Record failed operation after all retries - operationDuration := time.Since(operationStart) - otel.RecordOperationDuration(ctx, operationDuration, cmd, totalAttempts, lastConn) - return lastErr } @@ -707,17 +689,15 @@ func (c *baseClient) assertUnstableCommand(cmd Cmder) (bool, error) { } } -func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error, *pool.Conn) { +func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) { if attempt > 0 { if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - return false, err, nil + return false, err } } - var usedConn *pool.Conn retryTimeout := uint32(0) if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { - usedConn = cn // Process any pending push notifications before executing the command if err := c.processPushNotifications(ctx, cn); err != nil { internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err) @@ -758,10 +738,10 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool return nil }); err != nil { retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1) - return retry, err, usedConn + return retry, err } - return false, nil, usedConn + return false, nil } func (c *baseClient) retryBackoff(attempt int) time.Duration {