mirror of
https://github.com/redis/go-redis.git
synced 2025-12-05 06:22:07 +03:00
@@ -1,88 +0,0 @@
|
||||
// EXAMPLE: otel_metrics
|
||||
// HIDE_START
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
redisotel "github.com/redis/go-redis/extra/redisotel-native/v9"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
)
|
||||
|
||||
// ExampleClient_otel_metrics demonstrates how to enable OpenTelemetry metrics
|
||||
// for Redis operations and export them to an OTLP collector.
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
// HIDE_END
|
||||
|
||||
// STEP_START otel_exporter_setup
|
||||
// Create OTLP exporter that sends metrics to the collector
|
||||
// Default endpoint is localhost:4317 (gRPC)
|
||||
exporter, err := otlpmetricgrpc.New(ctx,
|
||||
otlpmetricgrpc.WithInsecure(), // Use insecure for local development
|
||||
// For production, configure TLS and authentication:
|
||||
// otlpmetricgrpc.WithEndpoint("your-collector:4317"),
|
||||
// otlpmetricgrpc.WithTLSCredentials(...),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create OTLP exporter: %v", err)
|
||||
}
|
||||
// STEP_END
|
||||
|
||||
// STEP_START otel_meter_provider
|
||||
// Create meter provider with periodic reader
|
||||
// Metrics are exported every 10 seconds
|
||||
meterProvider := metric.NewMeterProvider(
|
||||
metric.WithReader(
|
||||
metric.NewPeriodicReader(exporter,
|
||||
metric.WithInterval(10*time.Second),
|
||||
),
|
||||
),
|
||||
)
|
||||
defer func() {
|
||||
if err := meterProvider.Shutdown(ctx); err != nil {
|
||||
log.Printf("Error shutting down meter provider: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Set the global meter provider
|
||||
otel.SetMeterProvider(meterProvider)
|
||||
// STEP_END
|
||||
|
||||
// STEP_START redis_client_setup
|
||||
// Create Redis client
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
})
|
||||
defer rdb.Close()
|
||||
|
||||
// Initialize OTel instrumentation (uses global meter provider)
|
||||
if err := redisotel.Init(rdb); err != nil {
|
||||
log.Fatalf("Failed to initialize OTel: %v", err)
|
||||
}
|
||||
defer redisotel.Shutdown()
|
||||
// STEP_END
|
||||
|
||||
// STEP_START redis_operations
|
||||
// Execute Redis operations - metrics are automatically collected
|
||||
log.Println("Executing Redis operations...")
|
||||
for i := range 100 {
|
||||
if err := rdb.Set(ctx, "key"+strconv.Itoa(i), "value", 0).Err(); err != nil {
|
||||
log.Printf("Error setting key: %v", err)
|
||||
}
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
log.Println("Operations complete. Waiting for metrics to be exported...")
|
||||
|
||||
// Wait for metrics to be exported
|
||||
time.Sleep(15 * time.Second)
|
||||
// STEP_END
|
||||
}
|
||||
|
||||
@@ -1,69 +0,0 @@
|
||||
package redisotel
|
||||
|
||||
import (
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
// config holds the configuration for the instrumentation
|
||||
type config struct {
|
||||
meterProvider metric.MeterProvider
|
||||
histogramBuckets []float64
|
||||
}
|
||||
|
||||
// defaultConfig returns the default configuration
|
||||
func defaultConfig() config {
|
||||
return config{
|
||||
meterProvider: nil, // Will use global otel.GetMeterProvider() if nil
|
||||
histogramBuckets: defaultHistogramBuckets(),
|
||||
}
|
||||
}
|
||||
|
||||
// defaultHistogramBuckets returns the default histogram buckets for operation duration
|
||||
// These buckets are designed to capture typical Redis operation latencies:
|
||||
// - Sub-millisecond: 0.0001s (0.1ms), 0.0005s (0.5ms)
|
||||
// - Milliseconds: 0.001s (1ms), 0.005s (5ms), 0.01s (10ms), 0.05s (50ms), 0.1s (100ms)
|
||||
// - Sub-second: 0.5s (500ms)
|
||||
// - Seconds: 1s, 5s, 10s
|
||||
func defaultHistogramBuckets() []float64 {
|
||||
return []float64{
|
||||
0.0001, // 0.1ms
|
||||
0.0005, // 0.5ms
|
||||
0.001, // 1ms
|
||||
0.005, // 5ms
|
||||
0.01, // 10ms
|
||||
0.05, // 50ms
|
||||
0.1, // 100ms
|
||||
0.5, // 500ms
|
||||
1.0, // 1s
|
||||
5.0, // 5s
|
||||
10.0, // 10s
|
||||
}
|
||||
}
|
||||
|
||||
// Option is a functional option for configuring the instrumentation
|
||||
type Option interface {
|
||||
apply(*config)
|
||||
}
|
||||
|
||||
// optionFunc wraps a function to implement the Option interface
|
||||
type optionFunc func(*config)
|
||||
|
||||
func (f optionFunc) apply(c *config) {
|
||||
f(c)
|
||||
}
|
||||
|
||||
// WithMeterProvider sets the meter provider to use for creating metrics.
|
||||
// If not provided, the global meter provider from otel.GetMeterProvider() will be used.
|
||||
func WithMeterProvider(provider metric.MeterProvider) Option {
|
||||
return optionFunc(func(c *config) {
|
||||
c.meterProvider = provider
|
||||
})
|
||||
}
|
||||
|
||||
// WithHistogramBuckets sets custom histogram buckets for operation duration
|
||||
// Buckets should be in seconds and in ascending order
|
||||
func WithHistogramBuckets(buckets []float64) Option {
|
||||
return optionFunc(func(c *config) {
|
||||
c.histogramBuckets = buckets
|
||||
})
|
||||
}
|
||||
@@ -1,27 +0,0 @@
|
||||
module github.com/redis/go-redis/extra/redisotel-native/v9
|
||||
|
||||
go 1.23.0
|
||||
|
||||
toolchain go1.24.2
|
||||
|
||||
replace github.com/redis/go-redis/v9 => ../..
|
||||
|
||||
require (
|
||||
github.com/redis/go-redis/v9 v9.7.0
|
||||
go.opentelemetry.io/otel v1.38.0
|
||||
go.opentelemetry.io/otel/metric v1.38.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.38.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
)
|
||||
@@ -1,41 +0,0 @@
|
||||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
|
||||
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 h1:wm/Q0GAAykXv83wzcKzGGqAnnfLFyFe7RslekZuv+VI=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0/go.mod h1:ra3Pa40+oKjvYh+ZD3EdxFZZB0xdMfuileHAm4nNN7w=
|
||||
go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA=
|
||||
go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI=
|
||||
go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E=
|
||||
go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA=
|
||||
go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE=
|
||||
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
|
||||
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
@@ -1,326 +0,0 @@
|
||||
package redisotel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
const (
|
||||
// Library name for redis.client.library attribute
|
||||
libraryName = "go-redis"
|
||||
)
|
||||
|
||||
// metricsRecorder implements the otel.Recorder interface
|
||||
type metricsRecorder struct {
|
||||
operationDuration metric.Float64Histogram
|
||||
connectionCount metric.Int64UpDownCounter
|
||||
|
||||
// Client configuration for attributes (used for operation metrics only)
|
||||
serverAddr string
|
||||
serverPort string
|
||||
dbIndex string
|
||||
}
|
||||
|
||||
// RecordOperationDuration records db.client.operation.duration metric
|
||||
func (r *metricsRecorder) RecordOperationDuration(
|
||||
ctx context.Context,
|
||||
duration time.Duration,
|
||||
cmd redis.Cmder,
|
||||
attempts int,
|
||||
cn redis.ConnInfo,
|
||||
) {
|
||||
if r.operationDuration == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Convert duration to seconds (OTel convention for duration metrics)
|
||||
durationSeconds := duration.Seconds()
|
||||
|
||||
// Build attributes
|
||||
attrs := []attribute.KeyValue{
|
||||
// Required attributes
|
||||
attribute.String("db.operation.name", cmd.FullName()),
|
||||
attribute.String("redis.client.library", fmt.Sprintf("%s:%s", libraryName, redis.Version())),
|
||||
attribute.Int("redis.client.operation.retry_attempts", attempts-1), // attempts-1 = retry count
|
||||
attribute.Bool("redis.client.operation.blocking", isBlockingCommand(cmd)),
|
||||
|
||||
// Recommended attributes
|
||||
attribute.String("db.system", "redis"),
|
||||
attribute.String("server.address", r.serverAddr),
|
||||
}
|
||||
|
||||
// Add server.port if not default
|
||||
if r.serverPort != "" && r.serverPort != "6379" {
|
||||
attrs = append(attrs, attribute.String("server.port", r.serverPort))
|
||||
}
|
||||
|
||||
// Add db.namespace (database index) if available
|
||||
if r.dbIndex != "" {
|
||||
attrs = append(attrs, attribute.String("db.namespace", r.dbIndex))
|
||||
}
|
||||
|
||||
// Add network.peer.address and network.peer.port from connection
|
||||
if cn != nil {
|
||||
remoteAddr := cn.RemoteAddr()
|
||||
if remoteAddr != nil {
|
||||
peerAddr, peerPort := splitHostPort(remoteAddr.String())
|
||||
if peerAddr != "" {
|
||||
attrs = append(attrs, attribute.String("network.peer.address", peerAddr))
|
||||
}
|
||||
if peerPort != "" {
|
||||
attrs = append(attrs, attribute.String("network.peer.port", peerPort))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add error.type if command failed
|
||||
if err := cmd.Err(); err != nil {
|
||||
attrs = append(attrs, attribute.String("error.type", classifyError(err)))
|
||||
}
|
||||
|
||||
// Add db.response.status_code if error is a Redis error
|
||||
if err := cmd.Err(); err != nil {
|
||||
if statusCode := extractRedisErrorPrefix(err); statusCode != "" {
|
||||
attrs = append(attrs, attribute.String("db.response.status_code", statusCode))
|
||||
}
|
||||
}
|
||||
|
||||
// Record the histogram
|
||||
r.operationDuration.Record(ctx, durationSeconds, metric.WithAttributes(attrs...))
|
||||
}
|
||||
|
||||
// isBlockingCommand checks if a command is a blocking operation
|
||||
// Blocking commands have a timeout parameter and include: BLPOP, BRPOP, BRPOPLPUSH, BLMOVE,
|
||||
// BZPOPMIN, BZPOPMAX, BZMPOP, BLMPOP, XREAD with BLOCK, XREADGROUP with BLOCK
|
||||
func isBlockingCommand(cmd redis.Cmder) bool {
|
||||
name := strings.ToLower(cmd.Name())
|
||||
|
||||
// Commands that start with 'b' and are blocking
|
||||
if strings.HasPrefix(name, "b") {
|
||||
switch name {
|
||||
case "blpop", "brpop", "brpoplpush", "blmove", "bzpopmin", "bzpopmax", "bzmpop", "blmpop":
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// XREAD and XREADGROUP with BLOCK option
|
||||
if name == "xread" || name == "xreadgroup" {
|
||||
args := cmd.Args()
|
||||
for i, arg := range args {
|
||||
if argStr, ok := arg.(string); ok {
|
||||
if strings.ToLower(argStr) == "block" && i+1 < len(args) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// classifyError returns the error.type attribute value
|
||||
// Format: <category>:<subcategory>:<error_name>
|
||||
func classifyError(err error) string {
|
||||
if err == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
errStr := err.Error()
|
||||
|
||||
// Network errors
|
||||
if isNetworkError(err) {
|
||||
return fmt.Sprintf("network:%s", errStr)
|
||||
}
|
||||
|
||||
// Timeout errors
|
||||
if isTimeoutError(err) {
|
||||
return "timeout"
|
||||
}
|
||||
|
||||
// Redis errors (start with error prefix like ERR, WRONGTYPE, etc.)
|
||||
if prefix := extractRedisErrorPrefix(err); prefix != "" {
|
||||
return fmt.Sprintf("redis:%s", prefix)
|
||||
}
|
||||
|
||||
// Generic error
|
||||
return errStr
|
||||
}
|
||||
|
||||
// extractRedisErrorPrefix extracts the Redis error prefix (e.g., "ERR", "WRONGTYPE")
|
||||
// Redis errors typically start with an uppercase prefix followed by a space
|
||||
func extractRedisErrorPrefix(err error) string {
|
||||
if err == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
errStr := err.Error()
|
||||
|
||||
// Redis errors typically start with an uppercase prefix
|
||||
// Examples: "ERR ...", "WRONGTYPE ...", "CLUSTERDOWN ..."
|
||||
parts := strings.SplitN(errStr, " ", 2)
|
||||
if len(parts) > 0 {
|
||||
prefix := parts[0]
|
||||
// Check if it's all uppercase (Redis error convention)
|
||||
if prefix == strings.ToUpper(prefix) && len(prefix) > 0 {
|
||||
return prefix
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
// isNetworkError checks if an error is a network-related error
|
||||
func isNetworkError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check for net.Error interface
|
||||
if _, ok := err.(net.Error); ok {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check error message for common network error patterns
|
||||
errStr := strings.ToLower(err.Error())
|
||||
networkPatterns := []string{
|
||||
"connection refused",
|
||||
"connection reset",
|
||||
"broken pipe",
|
||||
"no route to host",
|
||||
"network is unreachable",
|
||||
"i/o timeout",
|
||||
"eof",
|
||||
}
|
||||
|
||||
for _, pattern := range networkPatterns {
|
||||
if strings.Contains(errStr, pattern) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// isTimeoutError checks if an error is a timeout error
|
||||
func isTimeoutError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check for net.Error with Timeout() method
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check error message
|
||||
errStr := strings.ToLower(err.Error())
|
||||
return strings.Contains(errStr, "timeout") || strings.Contains(errStr, "deadline exceeded")
|
||||
}
|
||||
|
||||
// splitHostPort splits a host:port string into host and port
|
||||
func splitHostPort(addr string) (host, port string) {
|
||||
// Handle Unix sockets
|
||||
if strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, "@") {
|
||||
return addr, ""
|
||||
}
|
||||
|
||||
host, port, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
// If split fails, return the whole address as host
|
||||
return addr, ""
|
||||
}
|
||||
|
||||
return host, port
|
||||
}
|
||||
|
||||
// parseAddr parses a Redis address into host and port
|
||||
func parseAddr(addr string) (host, port string) {
|
||||
// Handle Unix sockets
|
||||
if strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, "unix://") {
|
||||
return addr, ""
|
||||
}
|
||||
|
||||
// Remove protocol prefix if present
|
||||
addr = strings.TrimPrefix(addr, "redis://")
|
||||
addr = strings.TrimPrefix(addr, "rediss://")
|
||||
|
||||
host, port, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
// No port specified, use default
|
||||
return addr, "6379"
|
||||
}
|
||||
|
||||
return host, port
|
||||
}
|
||||
|
||||
// formatDBIndex formats the database index as a string
|
||||
func formatDBIndex(db int) string {
|
||||
if db < 0 {
|
||||
return ""
|
||||
}
|
||||
return strconv.Itoa(db)
|
||||
}
|
||||
|
||||
// RecordConnectionStateChange records a change in connection state
|
||||
// This is called from the pool when connections transition between states
|
||||
func (r *metricsRecorder) RecordConnectionStateChange(
|
||||
ctx context.Context,
|
||||
cn redis.ConnInfo,
|
||||
fromState, toState string,
|
||||
) {
|
||||
if r.connectionCount == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Extract server address from connection
|
||||
serverAddr, serverPort := extractServerInfo(cn)
|
||||
|
||||
// Build base attributes
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String("db.system", "redis"),
|
||||
attribute.String("server.address", serverAddr),
|
||||
}
|
||||
|
||||
// Add server.port if not default
|
||||
if serverPort != "" && serverPort != "6379" {
|
||||
attrs = append(attrs, attribute.String("server.port", serverPort))
|
||||
}
|
||||
|
||||
// Decrement old state (if not empty)
|
||||
if fromState != "" {
|
||||
fromAttrs := append([]attribute.KeyValue{}, attrs...)
|
||||
fromAttrs = append(fromAttrs, attribute.String("state", fromState))
|
||||
r.connectionCount.Add(ctx, -1, metric.WithAttributes(fromAttrs...))
|
||||
}
|
||||
|
||||
// Increment new state
|
||||
if toState != "" {
|
||||
toAttrs := append([]attribute.KeyValue{}, attrs...)
|
||||
toAttrs = append(toAttrs, attribute.String("state", toState))
|
||||
r.connectionCount.Add(ctx, 1, metric.WithAttributes(toAttrs...))
|
||||
}
|
||||
}
|
||||
|
||||
// extractServerInfo extracts server address and port from connection info
|
||||
func extractServerInfo(cn redis.ConnInfo) (addr, port string) {
|
||||
if cn == nil {
|
||||
return "", ""
|
||||
}
|
||||
|
||||
remoteAddr := cn.RemoteAddr()
|
||||
if remoteAddr == nil {
|
||||
return "", ""
|
||||
}
|
||||
|
||||
addrStr := remoteAddr.String()
|
||||
host, portStr := parseAddr(addrStr)
|
||||
return host, portStr
|
||||
}
|
||||
@@ -1,196 +0,0 @@
|
||||
// Package redisotel provides native OpenTelemetry instrumentation for go-redis.
|
||||
//
|
||||
// This package implements the OpenTelemetry Semantic Conventions for database clients,
|
||||
// providing metrics, traces, and logs for Redis operations.
|
||||
//
|
||||
// Basic Usage (with global MeterProvider):
|
||||
//
|
||||
// import (
|
||||
// "github.com/redis/go-redis/v9"
|
||||
// redisotel "github.com/redis/go-redis/extra/redisotel-native/v9"
|
||||
// "go.opentelemetry.io/otel"
|
||||
// )
|
||||
//
|
||||
// func main() {
|
||||
// // Initialize OpenTelemetry globally (meter provider, etc.)
|
||||
// otel.SetMeterProvider(myMeterProvider)
|
||||
//
|
||||
// // Create Redis client
|
||||
// rdb := redis.NewClient(&redis.Options{
|
||||
// Addr: "localhost:6379",
|
||||
// DB: 0,
|
||||
// })
|
||||
//
|
||||
// // Initialize native OTel instrumentation (uses global MeterProvider)
|
||||
// if err := redisotel.Init(rdb); err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
//
|
||||
// // Use the client normally - metrics are automatically recorded
|
||||
// rdb.Set(ctx, "key", "value", 0)
|
||||
// }
|
||||
//
|
||||
// Advanced Usage (with custom MeterProvider):
|
||||
//
|
||||
// // Pass a custom MeterProvider
|
||||
// if err := redisotel.Init(rdb, redisotel.WithMeterProvider(customProvider)); err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
package redisotel
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
var (
|
||||
// Global singleton instance
|
||||
globalInstance *metricsRecorder
|
||||
globalInstanceOnce sync.Once
|
||||
initErr error
|
||||
)
|
||||
|
||||
// Init initializes native OpenTelemetry instrumentation for the given Redis client.
|
||||
// This function should be called once per application, typically during startup.
|
||||
// Subsequent calls are no-ops and return nil.
|
||||
//
|
||||
// The function extracts configuration from the client (server address, port, database index)
|
||||
// and registers a global metrics recorder.
|
||||
//
|
||||
// If no MeterProvider is provided via WithMeterProvider option, the global MeterProvider
|
||||
// from otel.GetMeterProvider() will be used. Make sure to call otel.SetMeterProvider()
|
||||
// before calling Init() if you want to use a custom provider.
|
||||
//
|
||||
// Example (using global MeterProvider):
|
||||
//
|
||||
// otel.SetMeterProvider(myMeterProvider)
|
||||
// rdb := redis.NewClient(&redis.Options{
|
||||
// Addr: "localhost:6379",
|
||||
// DB: 0,
|
||||
// })
|
||||
// if err := redisotel.Init(rdb); err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
//
|
||||
// Example (using custom MeterProvider):
|
||||
//
|
||||
// if err := redisotel.Init(rdb, redisotel.WithMeterProvider(customProvider)); err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
func Init(client redis.UniversalClient, opts ...Option) error {
|
||||
globalInstanceOnce.Do(func() {
|
||||
initErr = initOnce(client, opts...)
|
||||
})
|
||||
return initErr
|
||||
}
|
||||
|
||||
// initOnce performs the actual initialization (called once by sync.Once)
|
||||
func initOnce(client redis.UniversalClient, opts ...Option) error {
|
||||
// Apply options
|
||||
cfg := defaultConfig()
|
||||
for _, opt := range opts {
|
||||
opt.apply(&cfg)
|
||||
}
|
||||
|
||||
// Extract client configuration
|
||||
serverAddr, serverPort, dbIndex, err := extractClientConfig(client)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to extract client config: %w", err)
|
||||
}
|
||||
|
||||
// Get meter provider (use global if not provided)
|
||||
meterProvider := cfg.meterProvider
|
||||
if meterProvider == nil {
|
||||
meterProvider = otel.GetMeterProvider()
|
||||
}
|
||||
|
||||
// Create meter
|
||||
meter := meterProvider.Meter(
|
||||
"github.com/redis/go-redis",
|
||||
metric.WithInstrumentationVersion(redis.Version()),
|
||||
)
|
||||
|
||||
// Create histogram for operation duration
|
||||
operationDuration, err := meter.Float64Histogram(
|
||||
"db.client.operation.duration",
|
||||
metric.WithDescription("Duration of database client operations"),
|
||||
metric.WithUnit("s"),
|
||||
metric.WithExplicitBucketBoundaries(cfg.histogramBuckets...),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create operation duration histogram: %w", err)
|
||||
}
|
||||
|
||||
// Create synchronous UpDownCounter for connection count
|
||||
connectionCount, err := meter.Int64UpDownCounter(
|
||||
"db.client.connection.count",
|
||||
metric.WithDescription("The number of connections that are currently in state described by the state attribute"),
|
||||
metric.WithUnit("{connection}"),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create connection count metric: %w", err)
|
||||
}
|
||||
|
||||
// Create recorder
|
||||
recorder := &metricsRecorder{
|
||||
operationDuration: operationDuration,
|
||||
connectionCount: connectionCount,
|
||||
serverAddr: serverAddr,
|
||||
serverPort: serverPort,
|
||||
dbIndex: dbIndex,
|
||||
}
|
||||
|
||||
// Register global recorder
|
||||
redis.SetOTelRecorder(recorder)
|
||||
globalInstance = recorder
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// extractClientConfig extracts server address, port, and database index from a Redis client
|
||||
func extractClientConfig(client redis.UniversalClient) (serverAddr, serverPort, dbIndex string, err error) {
|
||||
switch c := client.(type) {
|
||||
case *redis.Client:
|
||||
opts := c.Options()
|
||||
host, port := parseAddr(opts.Addr)
|
||||
return host, port, formatDBIndex(opts.DB), nil
|
||||
|
||||
case *redis.ClusterClient:
|
||||
opts := c.Options()
|
||||
if len(opts.Addrs) > 0 {
|
||||
// Use first address for server.address attribute
|
||||
host, port := parseAddr(opts.Addrs[0])
|
||||
return host, port, "", nil
|
||||
}
|
||||
return "", "", "", fmt.Errorf("cluster client has no addresses")
|
||||
|
||||
case *redis.Ring:
|
||||
opts := c.Options()
|
||||
if len(opts.Addrs) > 0 {
|
||||
// Use first address for server.address attribute
|
||||
for _, addr := range opts.Addrs {
|
||||
host, port := parseAddr(addr)
|
||||
return host, port, formatDBIndex(opts.DB), nil
|
||||
}
|
||||
}
|
||||
return "", "", "", fmt.Errorf("ring client has no addresses")
|
||||
|
||||
default:
|
||||
return "", "", "", fmt.Errorf("unsupported client type: %T", client)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown cleans up resources (for testing purposes)
|
||||
func Shutdown() {
|
||||
if globalInstance != nil {
|
||||
redis.SetOTelRecorder(nil)
|
||||
globalInstance = nil
|
||||
}
|
||||
// Reset the sync.Once so Init can be called again (useful for tests)
|
||||
globalInstanceOnce = sync.Once{}
|
||||
initErr = nil
|
||||
}
|
||||
@@ -1,64 +0,0 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9/internal/pool"
|
||||
)
|
||||
|
||||
// Cmder is a minimal interface for command information needed for metrics.
|
||||
// This avoids circular dependencies with the main redis package.
|
||||
type Cmder interface {
|
||||
Name() string
|
||||
FullName() string
|
||||
Args() []interface{}
|
||||
Err() error
|
||||
}
|
||||
|
||||
// Recorder is the interface for recording metrics.
|
||||
// Implementations are provided by extra/redisotel-native package.
|
||||
type Recorder interface {
|
||||
// RecordOperationDuration records the total operation duration (including all retries)
|
||||
RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn)
|
||||
|
||||
// RecordConnectionStateChange records when a connection changes state
|
||||
RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string)
|
||||
}
|
||||
|
||||
// Global recorder instance (initialized by extra/redisotel-native)
|
||||
var globalRecorder Recorder = noopRecorder{}
|
||||
|
||||
// SetGlobalRecorder sets the global recorder (called by Init() in extra/redisotel-native)
|
||||
func SetGlobalRecorder(r Recorder) {
|
||||
if r == nil {
|
||||
globalRecorder = noopRecorder{}
|
||||
// Unregister pool callback
|
||||
pool.SetConnectionStateChangeCallback(nil)
|
||||
return
|
||||
}
|
||||
globalRecorder = r
|
||||
|
||||
// Register pool callback to forward state changes to recorder
|
||||
pool.SetConnectionStateChangeCallback(func(ctx context.Context, cn *pool.Conn, fromState, toState string) {
|
||||
globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState)
|
||||
})
|
||||
}
|
||||
|
||||
// RecordOperationDuration records the total operation duration.
|
||||
// This is called from redis.go after command execution completes.
|
||||
func RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn) {
|
||||
globalRecorder.RecordOperationDuration(ctx, duration, cmd, attempts, cn)
|
||||
}
|
||||
|
||||
// RecordConnectionStateChange records when a connection changes state.
|
||||
// This is called from pool.go when connections transition between states.
|
||||
func RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) {
|
||||
globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState)
|
||||
}
|
||||
|
||||
// noopRecorder is a no-op implementation (zero overhead when metrics disabled)
|
||||
type noopRecorder struct{}
|
||||
|
||||
func (noopRecorder) RecordOperationDuration(context.Context, time.Duration, Cmder, int, *pool.Conn) {}
|
||||
func (noopRecorder) RecordConnectionStateChange(context.Context, *pool.Conn, string, string) {}
|
||||
@@ -24,7 +24,6 @@ var (
|
||||
// ErrPoolTimeout timed out waiting to get a connection from the connection pool.
|
||||
ErrPoolTimeout = errors.New("redis: connection pool timeout")
|
||||
|
||||
<<<<<<< HEAD
|
||||
// ErrConnUnusableTimeout is returned when a connection is not usable and we timed out trying to mark it as unusable.
|
||||
ErrConnUnusableTimeout = errors.New("redis: timed out trying to mark connection as unusable")
|
||||
|
||||
@@ -33,10 +32,6 @@ var (
|
||||
|
||||
// errConnNotPooled is returned when trying to return a non-pooled connection to the pool.
|
||||
errConnNotPooled = errors.New("connection not pooled")
|
||||
=======
|
||||
// Global callback for connection state changes (set by otel package)
|
||||
connectionStateChangeCallback func(ctx context.Context, cn *Conn, fromState, toState string)
|
||||
>>>>>>> c17657c6 (Adds connection state metrics)
|
||||
|
||||
// popAttempts is the maximum number of attempts to find a usable connection
|
||||
// when popping from the idle connection pool. This handles cases where connections
|
||||
@@ -56,23 +51,6 @@ var (
|
||||
noExpiration = maxTime
|
||||
)
|
||||
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
// SetConnectionStateChangeCallback sets the global callback for connection state changes.
|
||||
// This is called by the otel package to register metrics recording.
|
||||
func SetConnectionStateChangeCallback(fn func(ctx context.Context, cn *Conn, fromState, toState string)) {
|
||||
connectionStateChangeCallback = fn
|
||||
}
|
||||
|
||||
var timers = sync.Pool{
|
||||
New: func() interface{} {
|
||||
t := time.NewTimer(time.Hour)
|
||||
t.Stop()
|
||||
return t
|
||||
},
|
||||
}
|
||||
|
||||
>>>>>>> c17657c6 (Adds connection state metrics)
|
||||
// Stats contains pool state information and accumulated stats.
|
||||
type Stats struct {
|
||||
Hits uint32 // number of times free connection was found in the pool
|
||||
@@ -546,12 +524,6 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
|
||||
}
|
||||
|
||||
atomic.AddUint32(&p.stats.Hits, 1)
|
||||
|
||||
// Notify metrics: connection moved from idle to used
|
||||
if connectionStateChangeCallback != nil {
|
||||
connectionStateChangeCallback(ctx, cn, "idle", "used")
|
||||
}
|
||||
|
||||
return cn, nil
|
||||
}
|
||||
|
||||
@@ -574,12 +546,6 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Notify metrics: new connection is created and used
|
||||
if connectionStateChangeCallback != nil {
|
||||
connectionStateChangeCallback(ctx, newcn, "", "used")
|
||||
}
|
||||
|
||||
return newcn, nil
|
||||
}
|
||||
|
||||
@@ -874,26 +840,9 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
|
||||
p.connsMu.Unlock()
|
||||
p.idleConnsLen.Add(1)
|
||||
}
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
p.idleConnsLen.Add(1)
|
||||
|
||||
// Notify metrics: connection moved from used to idle
|
||||
if connectionStateChangeCallback != nil {
|
||||
connectionStateChangeCallback(ctx, cn, "used", "idle")
|
||||
}
|
||||
>>>>>>> c17657c6 (Adds connection state metrics)
|
||||
} else {
|
||||
shouldCloseConn = true
|
||||
<<<<<<< HEAD
|
||||
p.removeConnWithLock(cn)
|
||||
=======
|
||||
|
||||
// Notify metrics: connection removed (used -> nothing)
|
||||
if connectionStateChangeCallback != nil {
|
||||
connectionStateChangeCallback(ctx, cn, "used", "")
|
||||
}
|
||||
>>>>>>> c17657c6 (Adds connection state metrics)
|
||||
}
|
||||
|
||||
if freeTurn {
|
||||
@@ -908,7 +857,6 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
|
||||
}
|
||||
|
||||
func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
|
||||
<<<<<<< HEAD
|
||||
p.removeConnInternal(ctx, cn, reason, true)
|
||||
}
|
||||
|
||||
@@ -929,19 +877,12 @@ func (p *ConnPool) removeConnInternal(ctx context.Context, cn *Conn, reason erro
|
||||
hookManager.ProcessOnRemove(ctx, cn, reason)
|
||||
}
|
||||
|
||||
=======
|
||||
>>>>>>> c17657c6 (Adds connection state metrics)
|
||||
p.removeConnWithLock(cn)
|
||||
|
||||
if freeTurn {
|
||||
p.freeTurn()
|
||||
}
|
||||
|
||||
// Notify metrics: connection removed (assume from used state)
|
||||
if connectionStateChangeCallback != nil {
|
||||
connectionStateChangeCallback(ctx, cn, "used", "")
|
||||
}
|
||||
|
||||
_ = p.closeConn(cn)
|
||||
|
||||
// Check if we need to create new idle connections to maintain MinIdleConns
|
||||
|
||||
69
otel.go
69
otel.go
@@ -1,69 +0,0 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9/internal/otel"
|
||||
"github.com/redis/go-redis/v9/internal/pool"
|
||||
)
|
||||
|
||||
// ConnInfo provides information about a Redis connection for metrics.
|
||||
// This is a public interface to avoid exposing internal types.
|
||||
type ConnInfo interface {
|
||||
// RemoteAddr returns the remote network address
|
||||
RemoteAddr() net.Addr
|
||||
}
|
||||
|
||||
// OTelRecorder is the interface for recording OpenTelemetry metrics.
|
||||
// Implementations are provided by extra/redisotel-native package.
|
||||
//
|
||||
// This interface is exported to allow external packages to implement
|
||||
// custom recorders without depending on internal packages.
|
||||
type OTelRecorder interface {
|
||||
// RecordOperationDuration records the total operation duration (including all retries)
|
||||
RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn ConnInfo)
|
||||
|
||||
// RecordConnectionStateChange records when a connection changes state (e.g., idle -> used)
|
||||
RecordConnectionStateChange(ctx context.Context, cn ConnInfo, fromState, toState string)
|
||||
}
|
||||
|
||||
// SetOTelRecorder sets the global OpenTelemetry recorder.
|
||||
// This is typically called by Init() in extra/redisotel-native package.
|
||||
//
|
||||
// Setting a nil recorder disables metrics collection.
|
||||
func SetOTelRecorder(r OTelRecorder) {
|
||||
if r == nil {
|
||||
otel.SetGlobalRecorder(nil)
|
||||
return
|
||||
}
|
||||
otel.SetGlobalRecorder(&otelRecorderAdapter{r})
|
||||
}
|
||||
|
||||
// otelRecorderAdapter adapts the public OTelRecorder interface to the internal otel.Recorder interface
|
||||
type otelRecorderAdapter struct {
|
||||
recorder OTelRecorder
|
||||
}
|
||||
|
||||
func (a *otelRecorderAdapter) RecordOperationDuration(ctx context.Context, duration time.Duration, cmd otel.Cmder, attempts int, cn *pool.Conn) {
|
||||
// Convert internal Cmder to public Cmder
|
||||
if publicCmd, ok := cmd.(Cmder); ok {
|
||||
// Convert internal pool.Conn to public ConnInfo
|
||||
var connInfo ConnInfo
|
||||
if cn != nil {
|
||||
connInfo = cn
|
||||
}
|
||||
a.recorder.RecordOperationDuration(ctx, duration, publicCmd, attempts, connInfo)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *otelRecorderAdapter) RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) {
|
||||
// Convert internal pool.Conn to public ConnInfo
|
||||
var connInfo ConnInfo
|
||||
if cn != nil {
|
||||
connInfo = cn
|
||||
}
|
||||
a.recorder.RecordConnectionStateChange(ctx, connInfo, fromState, toState)
|
||||
}
|
||||
|
||||
30
redis.go
30
redis.go
@@ -13,7 +13,6 @@ import (
|
||||
"github.com/redis/go-redis/v9/internal"
|
||||
"github.com/redis/go-redis/v9/internal/auth/streaming"
|
||||
"github.com/redis/go-redis/v9/internal/hscan"
|
||||
"github.com/redis/go-redis/v9/internal/otel"
|
||||
"github.com/redis/go-redis/v9/internal/pool"
|
||||
"github.com/redis/go-redis/v9/internal/proto"
|
||||
"github.com/redis/go-redis/v9/maintnotifications"
|
||||
@@ -663,34 +662,17 @@ func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn,
|
||||
}
|
||||
|
||||
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
|
||||
// Start measuring total operation duration (includes all retries)
|
||||
operationStart := time.Now()
|
||||
var lastConn *pool.Conn
|
||||
|
||||
var lastErr error
|
||||
totalAttempts := 0
|
||||
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
|
||||
totalAttempts++
|
||||
attempt := attempt
|
||||
|
||||
retry, err, cn := c._process(ctx, cmd, attempt)
|
||||
if cn != nil {
|
||||
lastConn = cn
|
||||
}
|
||||
retry, err := c._process(ctx, cmd, attempt)
|
||||
if err == nil || !retry {
|
||||
// Record total operation duration
|
||||
operationDuration := time.Since(operationStart)
|
||||
otel.RecordOperationDuration(ctx, operationDuration, cmd, totalAttempts, lastConn)
|
||||
return err
|
||||
}
|
||||
|
||||
lastErr = err
|
||||
}
|
||||
|
||||
// Record failed operation after all retries
|
||||
operationDuration := time.Since(operationStart)
|
||||
otel.RecordOperationDuration(ctx, operationDuration, cmd, totalAttempts, lastConn)
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
@@ -707,17 +689,15 @@ func (c *baseClient) assertUnstableCommand(cmd Cmder) (bool, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error, *pool.Conn) {
|
||||
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
|
||||
if attempt > 0 {
|
||||
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
|
||||
return false, err, nil
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
var usedConn *pool.Conn
|
||||
retryTimeout := uint32(0)
|
||||
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
||||
usedConn = cn
|
||||
// Process any pending push notifications before executing the command
|
||||
if err := c.processPushNotifications(ctx, cn); err != nil {
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err)
|
||||
@@ -758,10 +738,10 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
||||
return nil
|
||||
}); err != nil {
|
||||
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
|
||||
return retry, err, usedConn
|
||||
return retry, err
|
||||
}
|
||||
|
||||
return false, nil, usedConn
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *baseClient) retryBackoff(attempt int) time.Duration {
|
||||
|
||||
Reference in New Issue
Block a user