From d588c3ca71a00fb9a65d183434ddea88aa073352 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Fri, 24 Oct 2025 11:18:48 +0100 Subject: [PATCH] First draft. One metric - command duration. Signed-off-by: Elena Kolevska --- example/otel_metrics.go | 88 +++++++++ extra/redisotel-native/config.go | 69 +++++++ extra/redisotel-native/go.mod | 27 +++ extra/redisotel-native/go.sum | 41 +++++ extra/redisotel-native/metrics.go | 269 ++++++++++++++++++++++++++++ extra/redisotel-native/redisotel.go | 185 +++++++++++++++++++ internal/otel/metrics.go | 47 +++++ otel.go | 57 ++++++ redis.go | 30 +++- 9 files changed, 808 insertions(+), 5 deletions(-) create mode 100644 example/otel_metrics.go create mode 100644 extra/redisotel-native/config.go create mode 100644 extra/redisotel-native/go.mod create mode 100644 extra/redisotel-native/go.sum create mode 100644 extra/redisotel-native/metrics.go create mode 100644 extra/redisotel-native/redisotel.go create mode 100644 internal/otel/metrics.go create mode 100644 otel.go diff --git a/example/otel_metrics.go b/example/otel_metrics.go new file mode 100644 index 00000000..adf5de51 --- /dev/null +++ b/example/otel_metrics.go @@ -0,0 +1,88 @@ +// EXAMPLE: otel_metrics +// HIDE_START +package main + +import ( + "context" + "log" + "strconv" + "time" + + redisotel "github.com/redis/go-redis/extra/redisotel-native/v9" + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/sdk/metric" +) + +// ExampleClient_otel_metrics demonstrates how to enable OpenTelemetry metrics +// for Redis operations and export them to an OTLP collector. +func main() { + ctx := context.Background() + + // HIDE_END + + // STEP_START otel_exporter_setup + // Create OTLP exporter that sends metrics to the collector + // Default endpoint is localhost:4317 (gRPC) + exporter, err := otlpmetricgrpc.New(ctx, + otlpmetricgrpc.WithInsecure(), // Use insecure for local development + // For production, configure TLS and authentication: + // otlpmetricgrpc.WithEndpoint("your-collector:4317"), + // otlpmetricgrpc.WithTLSCredentials(...), + ) + if err != nil { + log.Fatalf("Failed to create OTLP exporter: %v", err) + } + // STEP_END + + // STEP_START otel_meter_provider + // Create meter provider with periodic reader + // Metrics are exported every 10 seconds + meterProvider := metric.NewMeterProvider( + metric.WithReader( + metric.NewPeriodicReader(exporter, + metric.WithInterval(10*time.Second), + ), + ), + ) + defer func() { + if err := meterProvider.Shutdown(ctx); err != nil { + log.Printf("Error shutting down meter provider: %v", err) + } + }() + + // Set the global meter provider + otel.SetMeterProvider(meterProvider) + // STEP_END + + // STEP_START redis_client_setup + // Create Redis client + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer rdb.Close() + + // Initialize OTel instrumentation (uses global meter provider) + if err := redisotel.Init(rdb); err != nil { + log.Fatalf("Failed to initialize OTel: %v", err) + } + defer redisotel.Shutdown() + // STEP_END + + // STEP_START redis_operations + // Execute Redis operations - metrics are automatically collected + log.Println("Executing Redis operations...") + for i := range 100 { + if err := rdb.Set(ctx, "key"+strconv.Itoa(i), "value", 0).Err(); err != nil { + log.Printf("Error setting key: %v", err) + } + time.Sleep(time.Millisecond * 100) + } + log.Println("Operations complete. Waiting for metrics to be exported...") + + // Wait for metrics to be exported + time.Sleep(15 * time.Second) + // STEP_END +} + diff --git a/extra/redisotel-native/config.go b/extra/redisotel-native/config.go new file mode 100644 index 00000000..63397d8a --- /dev/null +++ b/extra/redisotel-native/config.go @@ -0,0 +1,69 @@ +package redisotel + +import ( + "go.opentelemetry.io/otel/metric" +) + +// config holds the configuration for the instrumentation +type config struct { + meterProvider metric.MeterProvider + histogramBuckets []float64 +} + +// defaultConfig returns the default configuration +func defaultConfig() config { + return config{ + meterProvider: nil, // Will use global otel.GetMeterProvider() if nil + histogramBuckets: defaultHistogramBuckets(), + } +} + +// defaultHistogramBuckets returns the default histogram buckets for operation duration +// These buckets are designed to capture typical Redis operation latencies: +// - Sub-millisecond: 0.0001s (0.1ms), 0.0005s (0.5ms) +// - Milliseconds: 0.001s (1ms), 0.005s (5ms), 0.01s (10ms), 0.05s (50ms), 0.1s (100ms) +// - Sub-second: 0.5s (500ms) +// - Seconds: 1s, 5s, 10s +func defaultHistogramBuckets() []float64 { + return []float64{ + 0.0001, // 0.1ms + 0.0005, // 0.5ms + 0.001, // 1ms + 0.005, // 5ms + 0.01, // 10ms + 0.05, // 50ms + 0.1, // 100ms + 0.5, // 500ms + 1.0, // 1s + 5.0, // 5s + 10.0, // 10s + } +} + +// Option is a functional option for configuring the instrumentation +type Option interface { + apply(*config) +} + +// optionFunc wraps a function to implement the Option interface +type optionFunc func(*config) + +func (f optionFunc) apply(c *config) { + f(c) +} + +// WithMeterProvider sets the meter provider to use for creating metrics. +// If not provided, the global meter provider from otel.GetMeterProvider() will be used. +func WithMeterProvider(provider metric.MeterProvider) Option { + return optionFunc(func(c *config) { + c.meterProvider = provider + }) +} + +// WithHistogramBuckets sets custom histogram buckets for operation duration +// Buckets should be in seconds and in ascending order +func WithHistogramBuckets(buckets []float64) Option { + return optionFunc(func(c *config) { + c.histogramBuckets = buckets + }) +} diff --git a/extra/redisotel-native/go.mod b/extra/redisotel-native/go.mod new file mode 100644 index 00000000..a814a31f --- /dev/null +++ b/extra/redisotel-native/go.mod @@ -0,0 +1,27 @@ +module github.com/redis/go-redis/extra/redisotel-native/v9 + +go 1.23.0 + +toolchain go1.24.2 + +replace github.com/redis/go-redis/v9 => ../.. + +require ( + github.com/redis/go-redis/v9 v9.7.0 + go.opentelemetry.io/otel v1.38.0 + go.opentelemetry.io/otel/metric v1.38.0 + go.opentelemetry.io/otel/sdk/metric v1.38.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 // indirect + go.opentelemetry.io/otel/sdk v1.38.0 // indirect + go.opentelemetry.io/otel/trace v1.38.0 // indirect + golang.org/x/sys v0.35.0 // indirect +) diff --git a/extra/redisotel-native/go.sum b/extra/redisotel-native/go.sum new file mode 100644 index 00000000..afd7fd46 --- /dev/null +++ b/extra/redisotel-native/go.sum @@ -0,0 +1,41 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 h1:wm/Q0GAAykXv83wzcKzGGqAnnfLFyFe7RslekZuv+VI= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0/go.mod h1:ra3Pa40+oKjvYh+ZD3EdxFZZB0xdMfuileHAm4nNN7w= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/extra/redisotel-native/metrics.go b/extra/redisotel-native/metrics.go new file mode 100644 index 00000000..6629172e --- /dev/null +++ b/extra/redisotel-native/metrics.go @@ -0,0 +1,269 @@ +package redisotel + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + "time" + + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + // Library name for redis.client.library attribute + libraryName = "go-redis" +) + +// metricsRecorder implements the otel.Recorder interface +type metricsRecorder struct { + operationDuration metric.Float64Histogram + + // Client configuration for attributes + serverAddr string + serverPort string + dbIndex string +} + +// RecordOperationDuration records db.client.operation.duration metric +func (r *metricsRecorder) RecordOperationDuration( + ctx context.Context, + duration time.Duration, + cmd redis.Cmder, + attempts int, + cn redis.ConnInfo, +) { + if r.operationDuration == nil { + return + } + + // Convert duration to seconds (OTel convention for duration metrics) + durationSeconds := duration.Seconds() + + // Build attributes + attrs := []attribute.KeyValue{ + // Required attributes + attribute.String("db.operation.name", cmd.FullName()), + attribute.String("redis.client.library", fmt.Sprintf("%s:%s", libraryName, redis.Version())), + attribute.Int("redis.client.operation.retry_attempts", attempts-1), // attempts-1 = retry count + attribute.Bool("redis.client.operation.blocking", isBlockingCommand(cmd)), + + // Recommended attributes + attribute.String("db.system", "redis"), + attribute.String("server.address", r.serverAddr), + } + + // Add server.port if not default + if r.serverPort != "" && r.serverPort != "6379" { + attrs = append(attrs, attribute.String("server.port", r.serverPort)) + } + + // Add db.namespace (database index) if available + if r.dbIndex != "" { + attrs = append(attrs, attribute.String("db.namespace", r.dbIndex)) + } + + // Add network.peer.address and network.peer.port from connection + if cn != nil { + remoteAddr := cn.RemoteAddr() + if remoteAddr != nil { + peerAddr, peerPort := splitHostPort(remoteAddr.String()) + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + } + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + } + + // Add error.type if command failed + if err := cmd.Err(); err != nil { + attrs = append(attrs, attribute.String("error.type", classifyError(err))) + } + + // Add db.response.status_code if error is a Redis error + if err := cmd.Err(); err != nil { + if statusCode := extractRedisErrorPrefix(err); statusCode != "" { + attrs = append(attrs, attribute.String("db.response.status_code", statusCode)) + } + } + + // Record the histogram + r.operationDuration.Record(ctx, durationSeconds, metric.WithAttributes(attrs...)) +} + +// isBlockingCommand checks if a command is a blocking operation +// Blocking commands have a timeout parameter and include: BLPOP, BRPOP, BRPOPLPUSH, BLMOVE, +// BZPOPMIN, BZPOPMAX, BZMPOP, BLMPOP, XREAD with BLOCK, XREADGROUP with BLOCK +func isBlockingCommand(cmd redis.Cmder) bool { + name := strings.ToLower(cmd.Name()) + + // Commands that start with 'b' and are blocking + if strings.HasPrefix(name, "b") { + switch name { + case "blpop", "brpop", "brpoplpush", "blmove", "bzpopmin", "bzpopmax", "bzmpop", "blmpop": + return true + } + } + + // XREAD and XREADGROUP with BLOCK option + if name == "xread" || name == "xreadgroup" { + args := cmd.Args() + for i, arg := range args { + if argStr, ok := arg.(string); ok { + if strings.ToLower(argStr) == "block" && i+1 < len(args) { + return true + } + } + } + } + + return false +} + +// classifyError returns the error.type attribute value +// Format: :: +func classifyError(err error) string { + if err == nil { + return "" + } + + errStr := err.Error() + + // Network errors + if isNetworkError(err) { + return fmt.Sprintf("network:%s", errStr) + } + + // Timeout errors + if isTimeoutError(err) { + return "timeout" + } + + // Redis errors (start with error prefix like ERR, WRONGTYPE, etc.) + if prefix := extractRedisErrorPrefix(err); prefix != "" { + return fmt.Sprintf("redis:%s", prefix) + } + + // Generic error + return errStr +} + +// extractRedisErrorPrefix extracts the Redis error prefix (e.g., "ERR", "WRONGTYPE") +// Redis errors typically start with an uppercase prefix followed by a space +func extractRedisErrorPrefix(err error) string { + if err == nil { + return "" + } + + errStr := err.Error() + + // Redis errors typically start with an uppercase prefix + // Examples: "ERR ...", "WRONGTYPE ...", "CLUSTERDOWN ..." + parts := strings.SplitN(errStr, " ", 2) + if len(parts) > 0 { + prefix := parts[0] + // Check if it's all uppercase (Redis error convention) + if prefix == strings.ToUpper(prefix) && len(prefix) > 0 { + return prefix + } + } + + return "" +} + +// isNetworkError checks if an error is a network-related error +func isNetworkError(err error) bool { + if err == nil { + return false + } + + // Check for net.Error interface + if _, ok := err.(net.Error); ok { + return true + } + + // Check error message for common network error patterns + errStr := strings.ToLower(err.Error()) + networkPatterns := []string{ + "connection refused", + "connection reset", + "broken pipe", + "no route to host", + "network is unreachable", + "i/o timeout", + "eof", + } + + for _, pattern := range networkPatterns { + if strings.Contains(errStr, pattern) { + return true + } + } + + return false +} + +// isTimeoutError checks if an error is a timeout error +func isTimeoutError(err error) bool { + if err == nil { + return false + } + + // Check for net.Error with Timeout() method + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + return true + } + + // Check error message + errStr := strings.ToLower(err.Error()) + return strings.Contains(errStr, "timeout") || strings.Contains(errStr, "deadline exceeded") +} + +// splitHostPort splits a host:port string into host and port +func splitHostPort(addr string) (host, port string) { + // Handle Unix sockets + if strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, "@") { + return addr, "" + } + + host, port, err := net.SplitHostPort(addr) + if err != nil { + // If split fails, return the whole address as host + return addr, "" + } + + return host, port +} + +// parseAddr parses a Redis address into host and port +func parseAddr(addr string) (host, port string) { + // Handle Unix sockets + if strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, "unix://") { + return addr, "" + } + + // Remove protocol prefix if present + addr = strings.TrimPrefix(addr, "redis://") + addr = strings.TrimPrefix(addr, "rediss://") + + host, port, err := net.SplitHostPort(addr) + if err != nil { + // No port specified, use default + return addr, "6379" + } + + return host, port +} + +// formatDBIndex formats the database index as a string +func formatDBIndex(db int) string { + if db < 0 { + return "" + } + return strconv.Itoa(db) +} diff --git a/extra/redisotel-native/redisotel.go b/extra/redisotel-native/redisotel.go new file mode 100644 index 00000000..9e0caa62 --- /dev/null +++ b/extra/redisotel-native/redisotel.go @@ -0,0 +1,185 @@ +// Package redisotel provides native OpenTelemetry instrumentation for go-redis. +// +// This package implements the OpenTelemetry Semantic Conventions for database clients, +// providing metrics, traces, and logs for Redis operations. +// +// Basic Usage (with global MeterProvider): +// +// import ( +// "github.com/redis/go-redis/v9" +// redisotel "github.com/redis/go-redis/extra/redisotel-native/v9" +// "go.opentelemetry.io/otel" +// ) +// +// func main() { +// // Initialize OpenTelemetry globally (meter provider, etc.) +// otel.SetMeterProvider(myMeterProvider) +// +// // Create Redis client +// rdb := redis.NewClient(&redis.Options{ +// Addr: "localhost:6379", +// DB: 0, +// }) +// +// // Initialize native OTel instrumentation (uses global MeterProvider) +// if err := redisotel.Init(rdb); err != nil { +// panic(err) +// } +// +// // Use the client normally - metrics are automatically recorded +// rdb.Set(ctx, "key", "value", 0) +// } +// +// Advanced Usage (with custom MeterProvider): +// +// // Pass a custom MeterProvider +// if err := redisotel.Init(rdb, redisotel.WithMeterProvider(customProvider)); err != nil { +// panic(err) +// } +package redisotel + +import ( + "fmt" + "sync" + + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +var ( + // Global singleton instance + globalInstance *metricsRecorder + globalInstanceOnce sync.Once + initErr error +) + +// Init initializes native OpenTelemetry instrumentation for the given Redis client. +// This function should be called once per application, typically during startup. +// Subsequent calls are no-ops and return nil. +// +// The function extracts configuration from the client (server address, port, database index) +// and registers a global metrics recorder. +// +// If no MeterProvider is provided via WithMeterProvider option, the global MeterProvider +// from otel.GetMeterProvider() will be used. Make sure to call otel.SetMeterProvider() +// before calling Init() if you want to use a custom provider. +// +// Example (using global MeterProvider): +// +// otel.SetMeterProvider(myMeterProvider) +// rdb := redis.NewClient(&redis.Options{ +// Addr: "localhost:6379", +// DB: 0, +// }) +// if err := redisotel.Init(rdb); err != nil { +// log.Fatal(err) +// } +// +// Example (using custom MeterProvider): +// +// if err := redisotel.Init(rdb, redisotel.WithMeterProvider(customProvider)); err != nil { +// log.Fatal(err) +// } +func Init(client redis.UniversalClient, opts ...Option) error { + globalInstanceOnce.Do(func() { + initErr = initOnce(client, opts...) + }) + return initErr +} + +// initOnce performs the actual initialization (called once by sync.Once) +func initOnce(client redis.UniversalClient, opts ...Option) error { + // Apply options + cfg := defaultConfig() + for _, opt := range opts { + opt.apply(&cfg) + } + + // Extract client configuration + serverAddr, serverPort, dbIndex, err := extractClientConfig(client) + if err != nil { + return fmt.Errorf("failed to extract client config: %w", err) + } + + // Get meter provider (use global if not provided) + meterProvider := cfg.meterProvider + if meterProvider == nil { + meterProvider = otel.GetMeterProvider() + } + + // Create meter + meter := meterProvider.Meter( + "github.com/redis/go-redis", + metric.WithInstrumentationVersion(redis.Version()), + ) + + // Create histogram for operation duration + operationDuration, err := meter.Float64Histogram( + "db.client.operation.duration", + metric.WithDescription("Duration of database client operations"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(cfg.histogramBuckets...), + ) + if err != nil { + return fmt.Errorf("failed to create operation duration histogram: %w", err) + } + + // Create recorder + recorder := &metricsRecorder{ + operationDuration: operationDuration, + serverAddr: serverAddr, + serverPort: serverPort, + dbIndex: dbIndex, + } + + // Register global recorder + redis.SetOTelRecorder(recorder) + globalInstance = recorder + + return nil +} + +// extractClientConfig extracts server address, port, and database index from a Redis client +func extractClientConfig(client redis.UniversalClient) (serverAddr, serverPort, dbIndex string, err error) { + switch c := client.(type) { + case *redis.Client: + opts := c.Options() + host, port := parseAddr(opts.Addr) + return host, port, formatDBIndex(opts.DB), nil + + case *redis.ClusterClient: + opts := c.Options() + if len(opts.Addrs) > 0 { + // Use first address for server.address attribute + host, port := parseAddr(opts.Addrs[0]) + return host, port, "", nil + } + return "", "", "", fmt.Errorf("cluster client has no addresses") + + case *redis.Ring: + opts := c.Options() + if len(opts.Addrs) > 0 { + // Use first address for server.address attribute + for _, addr := range opts.Addrs { + host, port := parseAddr(addr) + return host, port, formatDBIndex(opts.DB), nil + } + } + return "", "", "", fmt.Errorf("ring client has no addresses") + + default: + return "", "", "", fmt.Errorf("unsupported client type: %T", client) + } +} + +// Shutdown cleans up resources (for testing purposes) +func Shutdown() { + if globalInstance != nil { + redis.SetOTelRecorder(nil) + globalInstance = nil + } + // Reset the sync.Once so Init can be called again (useful for tests) + globalInstanceOnce = sync.Once{} + initErr = nil +} diff --git a/internal/otel/metrics.go b/internal/otel/metrics.go new file mode 100644 index 00000000..6662761c --- /dev/null +++ b/internal/otel/metrics.go @@ -0,0 +1,47 @@ +package otel + +import ( + "context" + "time" + + "github.com/redis/go-redis/v9/internal/pool" +) + +// Cmder is a minimal interface for command information needed for metrics. +// This avoids circular dependencies with the main redis package. +type Cmder interface { + Name() string + FullName() string + Args() []interface{} + Err() error +} + +// Recorder is the interface for recording metrics. +// Implementations are provided by extra/redisotel-native package. +type Recorder interface { + // RecordOperationDuration records the total operation duration (including all retries) + RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn) +} + +// Global recorder instance (initialized by extra/redisotel-native) +var globalRecorder Recorder = noopRecorder{} + +// SetGlobalRecorder sets the global recorder (called by Init() in extra/redisotel-native) +func SetGlobalRecorder(r Recorder) { + if r == nil { + globalRecorder = noopRecorder{} + return + } + globalRecorder = r +} + +// RecordOperationDuration records the total operation duration. +// This is called from redis.go after command execution completes. +func RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn) { + globalRecorder.RecordOperationDuration(ctx, duration, cmd, attempts, cn) +} + +// noopRecorder is a no-op implementation (zero overhead when metrics disabled) +type noopRecorder struct{} + +func (noopRecorder) RecordOperationDuration(context.Context, time.Duration, Cmder, int, *pool.Conn) {} diff --git a/otel.go b/otel.go new file mode 100644 index 00000000..9df89eeb --- /dev/null +++ b/otel.go @@ -0,0 +1,57 @@ +package redis + +import ( + "context" + "net" + "time" + + "github.com/redis/go-redis/v9/internal/otel" + "github.com/redis/go-redis/v9/internal/pool" +) + +// ConnInfo provides information about a Redis connection for metrics. +// This is a public interface to avoid exposing internal types. +type ConnInfo interface { + // RemoteAddr returns the remote network address + RemoteAddr() net.Addr +} + +// OTelRecorder is the interface for recording OpenTelemetry metrics. +// Implementations are provided by extra/redisotel-native package. +// +// This interface is exported to allow external packages to implement +// custom recorders without depending on internal packages. +type OTelRecorder interface { + // RecordOperationDuration records the total operation duration (including all retries) + RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn ConnInfo) +} + +// SetOTelRecorder sets the global OpenTelemetry recorder. +// This is typically called by Init() in extra/redisotel-native package. +// +// Setting a nil recorder disables metrics collection. +func SetOTelRecorder(r OTelRecorder) { + if r == nil { + otel.SetGlobalRecorder(nil) + return + } + otel.SetGlobalRecorder(&otelRecorderAdapter{r}) +} + +// otelRecorderAdapter adapts the public OTelRecorder interface to the internal otel.Recorder interface +type otelRecorderAdapter struct { + recorder OTelRecorder +} + +func (a *otelRecorderAdapter) RecordOperationDuration(ctx context.Context, duration time.Duration, cmd otel.Cmder, attempts int, cn *pool.Conn) { + // Convert internal Cmder to public Cmder + if publicCmd, ok := cmd.(Cmder); ok { + // Convert internal pool.Conn to public ConnInfo + var connInfo ConnInfo + if cn != nil { + connInfo = cn + } + a.recorder.RecordOperationDuration(ctx, duration, publicCmd, attempts, connInfo) + } +} + diff --git a/redis.go b/redis.go index a6a71067..4d6ed563 100644 --- a/redis.go +++ b/redis.go @@ -13,6 +13,7 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/auth/streaming" "github.com/redis/go-redis/v9/internal/hscan" + "github.com/redis/go-redis/v9/internal/otel" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/maintnotifications" @@ -662,17 +663,34 @@ func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn, } func (c *baseClient) process(ctx context.Context, cmd Cmder) error { + // Start measuring total operation duration (includes all retries) + operationStart := time.Now() + var lastConn *pool.Conn + var lastErr error + totalAttempts := 0 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + totalAttempts++ attempt := attempt - retry, err := c._process(ctx, cmd, attempt) + retry, err, cn := c._process(ctx, cmd, attempt) + if cn != nil { + lastConn = cn + } if err == nil || !retry { + // Record total operation duration + operationDuration := time.Since(operationStart) + otel.RecordOperationDuration(ctx, operationDuration, cmd, totalAttempts, lastConn) return err } lastErr = err } + + // Record failed operation after all retries + operationDuration := time.Since(operationStart) + otel.RecordOperationDuration(ctx, operationDuration, cmd, totalAttempts, lastConn) + return lastErr } @@ -689,15 +707,17 @@ func (c *baseClient) assertUnstableCommand(cmd Cmder) (bool, error) { } } -func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) { +func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error, *pool.Conn) { if attempt > 0 { if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - return false, err + return false, err, nil } } + var usedConn *pool.Conn retryTimeout := uint32(0) if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { + usedConn = cn // Process any pending push notifications before executing the command if err := c.processPushNotifications(ctx, cn); err != nil { internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err) @@ -738,10 +758,10 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool return nil }); err != nil { retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1) - return retry, err + return retry, err, usedConn } - return false, nil + return false, nil, usedConn } func (c *baseClient) retryBackoff(attempt int) time.Duration {