mirror of
https://github.com/redis/go-redis.git
synced 2025-12-05 06:22:07 +03:00
Adds connection state metrics
Signed-off-by: Elena Kolevska <elena@kolevska.com>
This commit is contained in:
committed by
ofekshenawa
parent
d588c3ca71
commit
2a7725db63
@@ -21,8 +21,9 @@ const (
|
||||
// metricsRecorder implements the otel.Recorder interface
|
||||
type metricsRecorder struct {
|
||||
operationDuration metric.Float64Histogram
|
||||
connectionCount metric.Int64UpDownCounter
|
||||
|
||||
// Client configuration for attributes
|
||||
// Client configuration for attributes (used for operation metrics only)
|
||||
serverAddr string
|
||||
serverPort string
|
||||
dbIndex string
|
||||
@@ -267,3 +268,59 @@ func formatDBIndex(db int) string {
|
||||
}
|
||||
return strconv.Itoa(db)
|
||||
}
|
||||
|
||||
// RecordConnectionStateChange records a change in connection state
|
||||
// This is called from the pool when connections transition between states
|
||||
func (r *metricsRecorder) RecordConnectionStateChange(
|
||||
ctx context.Context,
|
||||
cn redis.ConnInfo,
|
||||
fromState, toState string,
|
||||
) {
|
||||
if r.connectionCount == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Extract server address from connection
|
||||
serverAddr, serverPort := extractServerInfo(cn)
|
||||
|
||||
// Build base attributes
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String("db.system", "redis"),
|
||||
attribute.String("server.address", serverAddr),
|
||||
}
|
||||
|
||||
// Add server.port if not default
|
||||
if serverPort != "" && serverPort != "6379" {
|
||||
attrs = append(attrs, attribute.String("server.port", serverPort))
|
||||
}
|
||||
|
||||
// Decrement old state (if not empty)
|
||||
if fromState != "" {
|
||||
fromAttrs := append([]attribute.KeyValue{}, attrs...)
|
||||
fromAttrs = append(fromAttrs, attribute.String("state", fromState))
|
||||
r.connectionCount.Add(ctx, -1, metric.WithAttributes(fromAttrs...))
|
||||
}
|
||||
|
||||
// Increment new state
|
||||
if toState != "" {
|
||||
toAttrs := append([]attribute.KeyValue{}, attrs...)
|
||||
toAttrs = append(toAttrs, attribute.String("state", toState))
|
||||
r.connectionCount.Add(ctx, 1, metric.WithAttributes(toAttrs...))
|
||||
}
|
||||
}
|
||||
|
||||
// extractServerInfo extracts server address and port from connection info
|
||||
func extractServerInfo(cn redis.ConnInfo) (addr, port string) {
|
||||
if cn == nil {
|
||||
return "", ""
|
||||
}
|
||||
|
||||
remoteAddr := cn.RemoteAddr()
|
||||
if remoteAddr == nil {
|
||||
return "", ""
|
||||
}
|
||||
|
||||
addrStr := remoteAddr.String()
|
||||
host, portStr := parseAddr(addrStr)
|
||||
return host, portStr
|
||||
}
|
||||
|
||||
@@ -125,9 +125,20 @@ func initOnce(client redis.UniversalClient, opts ...Option) error {
|
||||
return fmt.Errorf("failed to create operation duration histogram: %w", err)
|
||||
}
|
||||
|
||||
// Create synchronous UpDownCounter for connection count
|
||||
connectionCount, err := meter.Int64UpDownCounter(
|
||||
"db.client.connection.count",
|
||||
metric.WithDescription("The number of connections that are currently in state described by the state attribute"),
|
||||
metric.WithUnit("{connection}"),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create connection count metric: %w", err)
|
||||
}
|
||||
|
||||
// Create recorder
|
||||
recorder := &metricsRecorder{
|
||||
operationDuration: operationDuration,
|
||||
connectionCount: connectionCount,
|
||||
serverAddr: serverAddr,
|
||||
serverPort: serverPort,
|
||||
dbIndex: dbIndex,
|
||||
|
||||
@@ -21,6 +21,9 @@ type Cmder interface {
|
||||
type Recorder interface {
|
||||
// RecordOperationDuration records the total operation duration (including all retries)
|
||||
RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn)
|
||||
|
||||
// RecordConnectionStateChange records when a connection changes state
|
||||
RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string)
|
||||
}
|
||||
|
||||
// Global recorder instance (initialized by extra/redisotel-native)
|
||||
@@ -30,9 +33,16 @@ var globalRecorder Recorder = noopRecorder{}
|
||||
func SetGlobalRecorder(r Recorder) {
|
||||
if r == nil {
|
||||
globalRecorder = noopRecorder{}
|
||||
// Unregister pool callback
|
||||
pool.SetConnectionStateChangeCallback(nil)
|
||||
return
|
||||
}
|
||||
globalRecorder = r
|
||||
|
||||
// Register pool callback to forward state changes to recorder
|
||||
pool.SetConnectionStateChangeCallback(func(ctx context.Context, cn *pool.Conn, fromState, toState string) {
|
||||
globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState)
|
||||
})
|
||||
}
|
||||
|
||||
// RecordOperationDuration records the total operation duration.
|
||||
@@ -41,7 +51,14 @@ func RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cm
|
||||
globalRecorder.RecordOperationDuration(ctx, duration, cmd, attempts, cn)
|
||||
}
|
||||
|
||||
// RecordConnectionStateChange records when a connection changes state.
|
||||
// This is called from pool.go when connections transition between states.
|
||||
func RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) {
|
||||
globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState)
|
||||
}
|
||||
|
||||
// noopRecorder is a no-op implementation (zero overhead when metrics disabled)
|
||||
type noopRecorder struct{}
|
||||
|
||||
func (noopRecorder) RecordOperationDuration(context.Context, time.Duration, Cmder, int, *pool.Conn) {}
|
||||
func (noopRecorder) RecordConnectionStateChange(context.Context, *pool.Conn, string, string) {}
|
||||
|
||||
@@ -24,6 +24,7 @@ var (
|
||||
// ErrPoolTimeout timed out waiting to get a connection from the connection pool.
|
||||
ErrPoolTimeout = errors.New("redis: connection pool timeout")
|
||||
|
||||
<<<<<<< HEAD
|
||||
// ErrConnUnusableTimeout is returned when a connection is not usable and we timed out trying to mark it as unusable.
|
||||
ErrConnUnusableTimeout = errors.New("redis: timed out trying to mark connection as unusable")
|
||||
|
||||
@@ -32,6 +33,10 @@ var (
|
||||
|
||||
// errConnNotPooled is returned when trying to return a non-pooled connection to the pool.
|
||||
errConnNotPooled = errors.New("connection not pooled")
|
||||
=======
|
||||
// Global callback for connection state changes (set by otel package)
|
||||
connectionStateChangeCallback func(ctx context.Context, cn *Conn, fromState, toState string)
|
||||
>>>>>>> c17657c6 (Adds connection state metrics)
|
||||
|
||||
// popAttempts is the maximum number of attempts to find a usable connection
|
||||
// when popping from the idle connection pool. This handles cases where connections
|
||||
@@ -51,6 +56,23 @@ var (
|
||||
noExpiration = maxTime
|
||||
)
|
||||
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
// SetConnectionStateChangeCallback sets the global callback for connection state changes.
|
||||
// This is called by the otel package to register metrics recording.
|
||||
func SetConnectionStateChangeCallback(fn func(ctx context.Context, cn *Conn, fromState, toState string)) {
|
||||
connectionStateChangeCallback = fn
|
||||
}
|
||||
|
||||
var timers = sync.Pool{
|
||||
New: func() interface{} {
|
||||
t := time.NewTimer(time.Hour)
|
||||
t.Stop()
|
||||
return t
|
||||
},
|
||||
}
|
||||
|
||||
>>>>>>> c17657c6 (Adds connection state metrics)
|
||||
// Stats contains pool state information and accumulated stats.
|
||||
type Stats struct {
|
||||
Hits uint32 // number of times free connection was found in the pool
|
||||
@@ -524,6 +546,12 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
|
||||
}
|
||||
|
||||
atomic.AddUint32(&p.stats.Hits, 1)
|
||||
|
||||
// Notify metrics: connection moved from idle to used
|
||||
if connectionStateChangeCallback != nil {
|
||||
connectionStateChangeCallback(ctx, cn, "idle", "used")
|
||||
}
|
||||
|
||||
return cn, nil
|
||||
}
|
||||
|
||||
@@ -546,6 +574,12 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Notify metrics: new connection is created and used
|
||||
if connectionStateChangeCallback != nil {
|
||||
connectionStateChangeCallback(ctx, newcn, "", "used")
|
||||
}
|
||||
|
||||
return newcn, nil
|
||||
}
|
||||
|
||||
@@ -840,9 +874,26 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
|
||||
p.connsMu.Unlock()
|
||||
p.idleConnsLen.Add(1)
|
||||
}
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
p.idleConnsLen.Add(1)
|
||||
|
||||
// Notify metrics: connection moved from used to idle
|
||||
if connectionStateChangeCallback != nil {
|
||||
connectionStateChangeCallback(ctx, cn, "used", "idle")
|
||||
}
|
||||
>>>>>>> c17657c6 (Adds connection state metrics)
|
||||
} else {
|
||||
shouldCloseConn = true
|
||||
<<<<<<< HEAD
|
||||
p.removeConnWithLock(cn)
|
||||
=======
|
||||
|
||||
// Notify metrics: connection removed (used -> nothing)
|
||||
if connectionStateChangeCallback != nil {
|
||||
connectionStateChangeCallback(ctx, cn, "used", "")
|
||||
}
|
||||
>>>>>>> c17657c6 (Adds connection state metrics)
|
||||
}
|
||||
|
||||
if freeTurn {
|
||||
@@ -857,6 +908,7 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
|
||||
}
|
||||
|
||||
func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
|
||||
<<<<<<< HEAD
|
||||
p.removeConnInternal(ctx, cn, reason, true)
|
||||
}
|
||||
|
||||
@@ -877,12 +929,19 @@ func (p *ConnPool) removeConnInternal(ctx context.Context, cn *Conn, reason erro
|
||||
hookManager.ProcessOnRemove(ctx, cn, reason)
|
||||
}
|
||||
|
||||
=======
|
||||
>>>>>>> c17657c6 (Adds connection state metrics)
|
||||
p.removeConnWithLock(cn)
|
||||
|
||||
if freeTurn {
|
||||
p.freeTurn()
|
||||
}
|
||||
|
||||
// Notify metrics: connection removed (assume from used state)
|
||||
if connectionStateChangeCallback != nil {
|
||||
connectionStateChangeCallback(ctx, cn, "used", "")
|
||||
}
|
||||
|
||||
_ = p.closeConn(cn)
|
||||
|
||||
// Check if we need to create new idle connections to maintain MinIdleConns
|
||||
|
||||
12
otel.go
12
otel.go
@@ -24,6 +24,9 @@ type ConnInfo interface {
|
||||
type OTelRecorder interface {
|
||||
// RecordOperationDuration records the total operation duration (including all retries)
|
||||
RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn ConnInfo)
|
||||
|
||||
// RecordConnectionStateChange records when a connection changes state (e.g., idle -> used)
|
||||
RecordConnectionStateChange(ctx context.Context, cn ConnInfo, fromState, toState string)
|
||||
}
|
||||
|
||||
// SetOTelRecorder sets the global OpenTelemetry recorder.
|
||||
@@ -55,3 +58,12 @@ func (a *otelRecorderAdapter) RecordOperationDuration(ctx context.Context, durat
|
||||
}
|
||||
}
|
||||
|
||||
func (a *otelRecorderAdapter) RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) {
|
||||
// Convert internal pool.Conn to public ConnInfo
|
||||
var connInfo ConnInfo
|
||||
if cn != nil {
|
||||
connInfo = cn
|
||||
}
|
||||
a.recorder.RecordConnectionStateChange(ctx, connInfo, fromState, toState)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user