1
0
mirror of https://github.com/redis/go-redis.git synced 2025-08-08 23:42:06 +03:00

feat(otel): add closing support to otel metrics instrumentation (#3444)

closes #3424
This commit is contained in:
Antonio Mindov
2025-07-24 12:48:34 +03:00
committed by GitHub
parent 162c3fbf47
commit 23a87a2137
2 changed files with 81 additions and 44 deletions

View File

@@ -28,6 +28,8 @@ type config struct {
meter metric.Meter
poolName string
closeChan chan struct{}
}
type baseOption interface {
@@ -145,3 +147,9 @@ func WithMeterProvider(mp metric.MeterProvider) MetricsOption {
conf.mp = mp
})
}
func WithCloseChan(closeChan chan struct{}) MetricsOption {
return metricsOption(func(conf *config) {
conf.closeChan = closeChan
})
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"sync"
"time"
"go.opentelemetry.io/otel"
@@ -13,6 +14,12 @@ import (
"github.com/redis/go-redis/v9"
)
type metricsState struct {
registrations []metric.Registration
closed bool
mutex sync.Mutex
}
// InstrumentMetrics starts reporting OpenTelemetry Metrics.
//
// Based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/database-metrics.md
@@ -30,49 +37,42 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
)
}
var state *metricsState
if conf.closeChan != nil {
state = &metricsState{
registrations: make([]metric.Registration, 0),
closed: false,
mutex: sync.Mutex{},
}
go func() {
<-conf.closeChan
state.mutex.Lock()
state.closed = true
for _, registration := range state.registrations {
if err := registration.Unregister(); err != nil {
otel.Handle(err)
}
}
state.mutex.Unlock()
}()
}
switch rdb := rdb.(type) {
case *redis.Client:
if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
if err := reportPoolStats(rdb, conf); err != nil {
return err
}
if err := addMetricsHook(rdb, conf); err != nil {
return err
}
return nil
return registerClient(rdb, conf, state)
case *redis.ClusterClient:
rdb.OnNewNode(func(rdb *redis.Client) {
if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
if err := reportPoolStats(rdb, conf); err != nil {
otel.Handle(err)
}
if err := addMetricsHook(rdb, conf); err != nil {
if err := registerClient(rdb, conf, state); err != nil {
otel.Handle(err)
}
})
return nil
case *redis.Ring:
rdb.OnNewNode(func(rdb *redis.Client) {
if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
if err := reportPoolStats(rdb, conf); err != nil {
otel.Handle(err)
}
if err := addMetricsHook(rdb, conf); err != nil {
if err := registerClient(rdb, conf, state); err != nil {
otel.Handle(err)
}
})
@@ -82,7 +82,38 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
}
}
func reportPoolStats(rdb *redis.Client, conf *config) error {
func registerClient(rdb *redis.Client, conf *config, state *metricsState) error {
if state != nil {
state.mutex.Lock()
defer state.mutex.Unlock()
if state.closed {
return nil
}
}
if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
registration, err := reportPoolStats(rdb, conf)
if err != nil {
return err
}
if state != nil {
state.registrations = append(state.registrations, registration)
}
if err := addMetricsHook(rdb, conf); err != nil {
return err
}
return nil
}
func reportPoolStats(rdb *redis.Client, conf *config) (metric.Registration, error) {
labels := conf.attrs
idleAttrs := append(labels, attribute.String("state", "idle"))
usedAttrs := append(labels, attribute.String("state", "used"))
@@ -92,7 +123,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
metric.WithDescription("The maximum number of idle open connections allowed"),
)
if err != nil {
return err
return nil, err
}
idleMin, err := conf.meter.Int64ObservableUpDownCounter(
@@ -100,7 +131,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
metric.WithDescription("The minimum number of idle open connections allowed"),
)
if err != nil {
return err
return nil, err
}
connsMax, err := conf.meter.Int64ObservableUpDownCounter(
@@ -108,7 +139,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
metric.WithDescription("The maximum number of open connections allowed"),
)
if err != nil {
return err
return nil, err
}
usage, err := conf.meter.Int64ObservableUpDownCounter(
@@ -116,7 +147,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
metric.WithDescription("The number of connections that are currently in state described by the state attribute"),
)
if err != nil {
return err
return nil, err
}
timeouts, err := conf.meter.Int64ObservableUpDownCounter(
@@ -124,7 +155,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
metric.WithDescription("The number of connection timeouts that have occurred trying to obtain a connection from the pool"),
)
if err != nil {
return err
return nil, err
}
hits, err := conf.meter.Int64ObservableUpDownCounter(
@@ -132,7 +163,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
metric.WithDescription("The number of times free connection was found in the pool"),
)
if err != nil {
return err
return nil, err
}
misses, err := conf.meter.Int64ObservableUpDownCounter(
@@ -140,11 +171,11 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
metric.WithDescription("The number of times free connection was not found in the pool"),
)
if err != nil {
return err
return nil, err
}
redisConf := rdb.Options()
_, err = conf.meter.RegisterCallback(
return conf.meter.RegisterCallback(
func(ctx context.Context, o metric.Observer) error {
stats := rdb.PoolStats()
@@ -168,8 +199,6 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
hits,
misses,
)
return err
}
func addMetricsHook(rdb *redis.Client, conf *config) error {