From 2a7725db635c7bf91f08d81e152e4ec86835bd1c Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Mon, 27 Oct 2025 18:02:15 +0000 Subject: [PATCH] Adds connection state metrics Signed-off-by: Elena Kolevska --- extra/redisotel-native/metrics.go | 59 ++++++++++++++++++++++++++++- extra/redisotel-native/redisotel.go | 11 ++++++ internal/otel/metrics.go | 17 +++++++++ internal/pool/pool.go | 59 +++++++++++++++++++++++++++++ otel.go | 12 ++++++ 5 files changed, 157 insertions(+), 1 deletion(-) diff --git a/extra/redisotel-native/metrics.go b/extra/redisotel-native/metrics.go index 6629172e..edf12273 100644 --- a/extra/redisotel-native/metrics.go +++ b/extra/redisotel-native/metrics.go @@ -21,8 +21,9 @@ const ( // metricsRecorder implements the otel.Recorder interface type metricsRecorder struct { operationDuration metric.Float64Histogram + connectionCount metric.Int64UpDownCounter - // Client configuration for attributes + // Client configuration for attributes (used for operation metrics only) serverAddr string serverPort string dbIndex string @@ -267,3 +268,59 @@ func formatDBIndex(db int) string { } return strconv.Itoa(db) } + +// RecordConnectionStateChange records a change in connection state +// This is called from the pool when connections transition between states +func (r *metricsRecorder) RecordConnectionStateChange( + ctx context.Context, + cn redis.ConnInfo, + fromState, toState string, +) { + if r.connectionCount == nil { + return + } + + // Extract server address from connection + serverAddr, serverPort := extractServerInfo(cn) + + // Build base attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system", "redis"), + attribute.String("server.address", serverAddr), + } + + // Add server.port if not default + if serverPort != "" && serverPort != "6379" { + attrs = append(attrs, attribute.String("server.port", serverPort)) + } + + // Decrement old state (if not empty) + if fromState != "" { + fromAttrs := append([]attribute.KeyValue{}, attrs...) + fromAttrs = append(fromAttrs, attribute.String("state", fromState)) + r.connectionCount.Add(ctx, -1, metric.WithAttributes(fromAttrs...)) + } + + // Increment new state + if toState != "" { + toAttrs := append([]attribute.KeyValue{}, attrs...) + toAttrs = append(toAttrs, attribute.String("state", toState)) + r.connectionCount.Add(ctx, 1, metric.WithAttributes(toAttrs...)) + } +} + +// extractServerInfo extracts server address and port from connection info +func extractServerInfo(cn redis.ConnInfo) (addr, port string) { + if cn == nil { + return "", "" + } + + remoteAddr := cn.RemoteAddr() + if remoteAddr == nil { + return "", "" + } + + addrStr := remoteAddr.String() + host, portStr := parseAddr(addrStr) + return host, portStr +} diff --git a/extra/redisotel-native/redisotel.go b/extra/redisotel-native/redisotel.go index 9e0caa62..fa45c069 100644 --- a/extra/redisotel-native/redisotel.go +++ b/extra/redisotel-native/redisotel.go @@ -125,9 +125,20 @@ func initOnce(client redis.UniversalClient, opts ...Option) error { return fmt.Errorf("failed to create operation duration histogram: %w", err) } + // Create synchronous UpDownCounter for connection count + connectionCount, err := meter.Int64UpDownCounter( + "db.client.connection.count", + metric.WithDescription("The number of connections that are currently in state described by the state attribute"), + metric.WithUnit("{connection}"), + ) + if err != nil { + return fmt.Errorf("failed to create connection count metric: %w", err) + } + // Create recorder recorder := &metricsRecorder{ operationDuration: operationDuration, + connectionCount: connectionCount, serverAddr: serverAddr, serverPort: serverPort, dbIndex: dbIndex, diff --git a/internal/otel/metrics.go b/internal/otel/metrics.go index 6662761c..9755c572 100644 --- a/internal/otel/metrics.go +++ b/internal/otel/metrics.go @@ -21,6 +21,9 @@ type Cmder interface { type Recorder interface { // RecordOperationDuration records the total operation duration (including all retries) RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn) + + // RecordConnectionStateChange records when a connection changes state + RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) } // Global recorder instance (initialized by extra/redisotel-native) @@ -30,9 +33,16 @@ var globalRecorder Recorder = noopRecorder{} func SetGlobalRecorder(r Recorder) { if r == nil { globalRecorder = noopRecorder{} + // Unregister pool callback + pool.SetConnectionStateChangeCallback(nil) return } globalRecorder = r + + // Register pool callback to forward state changes to recorder + pool.SetConnectionStateChangeCallback(func(ctx context.Context, cn *pool.Conn, fromState, toState string) { + globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState) + }) } // RecordOperationDuration records the total operation duration. @@ -41,7 +51,14 @@ func RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cm globalRecorder.RecordOperationDuration(ctx, duration, cmd, attempts, cn) } +// RecordConnectionStateChange records when a connection changes state. +// This is called from pool.go when connections transition between states. +func RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) { + globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState) +} + // noopRecorder is a no-op implementation (zero overhead when metrics disabled) type noopRecorder struct{} func (noopRecorder) RecordOperationDuration(context.Context, time.Duration, Cmder, int, *pool.Conn) {} +func (noopRecorder) RecordConnectionStateChange(context.Context, *pool.Conn, string, string) {} diff --git a/internal/pool/pool.go b/internal/pool/pool.go index d757d1f4..2369f363 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -24,6 +24,7 @@ var ( // ErrPoolTimeout timed out waiting to get a connection from the connection pool. ErrPoolTimeout = errors.New("redis: connection pool timeout") +<<<<<<< HEAD // ErrConnUnusableTimeout is returned when a connection is not usable and we timed out trying to mark it as unusable. ErrConnUnusableTimeout = errors.New("redis: timed out trying to mark connection as unusable") @@ -32,6 +33,10 @@ var ( // errConnNotPooled is returned when trying to return a non-pooled connection to the pool. errConnNotPooled = errors.New("connection not pooled") +======= + // Global callback for connection state changes (set by otel package) + connectionStateChangeCallback func(ctx context.Context, cn *Conn, fromState, toState string) +>>>>>>> c17657c6 (Adds connection state metrics) // popAttempts is the maximum number of attempts to find a usable connection // when popping from the idle connection pool. This handles cases where connections @@ -51,6 +56,23 @@ var ( noExpiration = maxTime ) +<<<<<<< HEAD +======= +// SetConnectionStateChangeCallback sets the global callback for connection state changes. +// This is called by the otel package to register metrics recording. +func SetConnectionStateChangeCallback(fn func(ctx context.Context, cn *Conn, fromState, toState string)) { + connectionStateChangeCallback = fn +} + +var timers = sync.Pool{ + New: func() interface{} { + t := time.NewTimer(time.Hour) + t.Stop() + return t + }, +} + +>>>>>>> c17657c6 (Adds connection state metrics) // Stats contains pool state information and accumulated stats. type Stats struct { Hits uint32 // number of times free connection was found in the pool @@ -524,6 +546,12 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { } atomic.AddUint32(&p.stats.Hits, 1) + + // Notify metrics: connection moved from idle to used + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, cn, "idle", "used") + } + return cn, nil } @@ -546,6 +574,12 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { return nil, err } } + + // Notify metrics: new connection is created and used + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, newcn, "", "used") + } + return newcn, nil } @@ -840,9 +874,26 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) { p.connsMu.Unlock() p.idleConnsLen.Add(1) } +<<<<<<< HEAD +======= + p.idleConnsLen.Add(1) + + // Notify metrics: connection moved from used to idle + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, cn, "used", "idle") + } +>>>>>>> c17657c6 (Adds connection state metrics) } else { shouldCloseConn = true +<<<<<<< HEAD p.removeConnWithLock(cn) +======= + + // Notify metrics: connection removed (used -> nothing) + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, cn, "used", "") + } +>>>>>>> c17657c6 (Adds connection state metrics) } if freeTurn { @@ -857,6 +908,7 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) { } func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) { +<<<<<<< HEAD p.removeConnInternal(ctx, cn, reason, true) } @@ -877,12 +929,19 @@ func (p *ConnPool) removeConnInternal(ctx context.Context, cn *Conn, reason erro hookManager.ProcessOnRemove(ctx, cn, reason) } +======= +>>>>>>> c17657c6 (Adds connection state metrics) p.removeConnWithLock(cn) if freeTurn { p.freeTurn() } + // Notify metrics: connection removed (assume from used state) + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, cn, "used", "") + } + _ = p.closeConn(cn) // Check if we need to create new idle connections to maintain MinIdleConns diff --git a/otel.go b/otel.go index 9df89eeb..aa2f4028 100644 --- a/otel.go +++ b/otel.go @@ -24,6 +24,9 @@ type ConnInfo interface { type OTelRecorder interface { // RecordOperationDuration records the total operation duration (including all retries) RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn ConnInfo) + + // RecordConnectionStateChange records when a connection changes state (e.g., idle -> used) + RecordConnectionStateChange(ctx context.Context, cn ConnInfo, fromState, toState string) } // SetOTelRecorder sets the global OpenTelemetry recorder. @@ -55,3 +58,12 @@ func (a *otelRecorderAdapter) RecordOperationDuration(ctx context.Context, durat } } +func (a *otelRecorderAdapter) RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) { + // Convert internal pool.Conn to public ConnInfo + var connInfo ConnInfo + if cn != nil { + connInfo = cn + } + a.recorder.RecordConnectionStateChange(ctx, connInfo, fromState, toState) +} +