1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-05 06:22:07 +03:00

First draft. One metric - command duration.

Signed-off-by: Elena Kolevska <elena@kolevska.com>
This commit is contained in:
Elena Kolevska
2025-10-24 11:18:48 +01:00
committed by ofekshenawa
parent dc053a42c6
commit d588c3ca71
9 changed files with 808 additions and 5 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/auth/streaming"
"github.com/redis/go-redis/v9/internal/hscan"
"github.com/redis/go-redis/v9/internal/otel"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/maintnotifications"
@@ -662,17 +663,34 @@ func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn,
}
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
// Start measuring total operation duration (includes all retries)
operationStart := time.Now()
var lastConn *pool.Conn
var lastErr error
totalAttempts := 0
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
totalAttempts++
attempt := attempt
retry, err := c._process(ctx, cmd, attempt)
retry, err, cn := c._process(ctx, cmd, attempt)
if cn != nil {
lastConn = cn
}
if err == nil || !retry {
// Record total operation duration
operationDuration := time.Since(operationStart)
otel.RecordOperationDuration(ctx, operationDuration, cmd, totalAttempts, lastConn)
return err
}
lastErr = err
}
// Record failed operation after all retries
operationDuration := time.Since(operationStart)
otel.RecordOperationDuration(ctx, operationDuration, cmd, totalAttempts, lastConn)
return lastErr
}
@@ -689,15 +707,17 @@ func (c *baseClient) assertUnstableCommand(cmd Cmder) (bool, error) {
}
}
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error, *pool.Conn) {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
return false, err
return false, err, nil
}
}
var usedConn *pool.Conn
retryTimeout := uint32(0)
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
usedConn = cn
// Process any pending push notifications before executing the command
if err := c.processPushNotifications(ctx, cn); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err)
@@ -738,10 +758,10 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
return nil
}); err != nil {
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
return retry, err
return retry, err, usedConn
}
return false, nil
return false, nil, usedConn
}
func (c *baseClient) retryBackoff(attempt int) time.Duration {