diff --git a/hset_benchmark_test.go b/hset_benchmark_test.go new file mode 100644 index 00000000..df163435 --- /dev/null +++ b/hset_benchmark_test.go @@ -0,0 +1,245 @@ +package redis_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +// HSET Benchmark Tests +// +// This file contains benchmark tests for Redis HSET operations with different scales: +// 1, 10, 100, 1000, 10000, 100000 operations +// +// Prerequisites: +// - Redis server running on localhost:6379 +// - No authentication required +// +// Usage: +// go test -bench=BenchmarkHSET -v ./hset_benchmark_test.go +// go test -bench=BenchmarkHSETPipelined -v ./hset_benchmark_test.go +// go test -bench=. -v ./hset_benchmark_test.go # Run all benchmarks +// +// Example output: +// BenchmarkHSET/HSET_1_operations-8 5000 250000 ns/op 1000000.00 ops/sec +// BenchmarkHSET/HSET_100_operations-8 100 10000000 ns/op 100000.00 ops/sec +// +// The benchmarks test three different approaches: +// 1. Individual HSET commands (BenchmarkHSET) +// 2. Pipelined HSET commands (BenchmarkHSETPipelined) + +// BenchmarkHSET benchmarks HSET operations with different scales +func BenchmarkHSET(b *testing.B) { + ctx := context.Background() + + // Setup Redis client + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + DB: 0, + }) + defer rdb.Close() + + // Test connection + if err := rdb.Ping(ctx).Err(); err != nil { + b.Skipf("Redis server not available: %v", err) + } + + // Clean up before and after tests + defer func() { + rdb.FlushDB(ctx) + }() + + scales := []int{1, 10, 100, 1000, 10000, 100000} + + for _, scale := range scales { + b.Run(fmt.Sprintf("HSET_%d_operations", scale), func(b *testing.B) { + benchmarkHSETOperations(b, rdb, ctx, scale) + }) + } +} + +// benchmarkHSETOperations performs the actual HSET benchmark for a given scale +func benchmarkHSETOperations(b *testing.B, rdb *redis.Client, ctx context.Context, operations int) { + hashKey := fmt.Sprintf("benchmark_hash_%d", operations) + + b.ResetTimer() + b.StartTimer() + totalTimes := []time.Duration{} + + for i := 0; i < b.N; i++ { + b.StopTimer() + // Clean up the hash before each iteration + rdb.Del(ctx, hashKey) + b.StartTimer() + + startTime := time.Now() + // Perform the specified number of HSET operations + for j := 0; j < operations; j++ { + field := fmt.Sprintf("field_%d", j) + value := fmt.Sprintf("value_%d", j) + + err := rdb.HSet(ctx, hashKey, field, value).Err() + if err != nil { + b.Fatalf("HSET operation failed: %v", err) + } + } + totalTimes = append(totalTimes, time.Now().Sub(startTime)) + } + + // Stop the timer to calculate metrics + b.StopTimer() + + // Report operations per second + opsPerSec := float64(operations*b.N) / b.Elapsed().Seconds() + b.ReportMetric(opsPerSec, "ops/sec") + + // Report average time per operation + avgTimePerOp := b.Elapsed().Nanoseconds() / int64(operations*b.N) + b.ReportMetric(float64(avgTimePerOp), "ns/op") + // report average time in milliseconds from totalTimes + avgTimePerOpMs := totalTimes[0].Milliseconds() / int64(len(totalTimes)) + b.ReportMetric(float64(avgTimePerOpMs), "ms") +} + +// BenchmarkHSETPipelined benchmarks HSET operations using pipelining for better performance +func BenchmarkHSETPipelined(b *testing.B) { + ctx := context.Background() + + // Setup Redis client + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + DB: 0, + }) + defer rdb.Close() + + // Test connection + if err := rdb.Ping(ctx).Err(); err != nil { + b.Skipf("Redis server not available: %v", err) + } + + // Clean up before and after tests + defer func() { + rdb.FlushDB(ctx) + }() + + scales := []int{1, 10, 100, 1000, 10000, 100000} + + for _, scale := range scales { + b.Run(fmt.Sprintf("HSET_Pipelined_%d_operations", scale), func(b *testing.B) { + benchmarkHSETPipelined(b, rdb, ctx, scale) + }) + } +} + +// benchmarkHSETPipelined performs HSET benchmark using pipelining +func benchmarkHSETPipelined(b *testing.B, rdb *redis.Client, ctx context.Context, operations int) { + hashKey := fmt.Sprintf("benchmark_hash_pipelined_%d", operations) + + b.ResetTimer() + b.StartTimer() + totalTimes := []time.Duration{} + + for i := 0; i < b.N; i++ { + b.StopTimer() + // Clean up the hash before each iteration + rdb.Del(ctx, hashKey) + b.StartTimer() + + startTime := time.Now() + // Use pipelining for better performance + pipe := rdb.Pipeline() + + // Add all HSET operations to the pipeline + for j := 0; j < operations; j++ { + field := fmt.Sprintf("field_%d", j) + value := fmt.Sprintf("value_%d", j) + pipe.HSet(ctx, hashKey, field, value) + } + + // Execute all operations at once + _, err := pipe.Exec(ctx) + if err != nil { + b.Fatalf("Pipeline execution failed: %v", err) + } + totalTimes = append(totalTimes, time.Now().Sub(startTime)) + } + + b.StopTimer() + + // Report operations per second + opsPerSec := float64(operations*b.N) / b.Elapsed().Seconds() + b.ReportMetric(opsPerSec, "ops/sec") + + // Report average time per operation + avgTimePerOp := b.Elapsed().Nanoseconds() / int64(operations*b.N) + b.ReportMetric(float64(avgTimePerOp), "ns/op") + // report average time in milliseconds from totalTimes + avgTimePerOpMs := totalTimes[0].Milliseconds() / int64(len(totalTimes)) + b.ReportMetric(float64(avgTimePerOpMs), "ms") +} + +// add same tests but with RESP2 +func BenchmarkHSET_RESP2(b *testing.B) { + ctx := context.Background() + + // Setup Redis client + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", // no password docs + DB: 0, // use default DB + Protocol: 2, + }) + defer rdb.Close() + + // Test connection + if err := rdb.Ping(ctx).Err(); err != nil { + b.Skipf("Redis server not available: %v", err) + } + + // Clean up before and after tests + defer func() { + rdb.FlushDB(ctx) + }() + + scales := []int{1, 10, 100, 1000, 10000, 100000} + + for _, scale := range scales { + b.Run(fmt.Sprintf("HSET_RESP2_%d_operations", scale), func(b *testing.B) { + benchmarkHSETOperations(b, rdb, ctx, scale) + }) + } +} + +func BenchmarkHSETPipelined_RESP2(b *testing.B) { + ctx := context.Background() + + // Setup Redis client + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", // no password docs + DB: 0, // use default DB + Protocol: 2, + }) + defer rdb.Close() + + // Test connection + if err := rdb.Ping(ctx).Err(); err != nil { + b.Skipf("Redis server not available: %v", err) + } + + // Clean up before and after tests + defer func() { + rdb.FlushDB(ctx) + }() + + scales := []int{1, 10, 100, 1000, 10000, 100000} + + for _, scale := range scales { + b.Run(fmt.Sprintf("HSET_Pipelined_RESP2_%d_operations", scale), func(b *testing.B) { + benchmarkHSETPipelined(b, rdb, ctx, scale) + }) + } +} diff --git a/internal/pool/conn.go b/internal/pool/conn.go index fa93781d..97992539 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -113,6 +113,13 @@ func (cn *Conn) Close() error { return cn.netConn.Close() } +// MaybeHasData tries to peek at the next byte in the socket without consuming it +// This is used to check if there are push notifications available +// Important: This will work on Linux, but not on Windows +func (cn *Conn) MaybeHasData() bool { + return maybeHasData(cn.netConn) +} + func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time { tm := time.Now() cn.SetUsedAt(tm) diff --git a/internal/pool/conn_check.go b/internal/pool/conn_check.go index 48857abe..9e83dd83 100644 --- a/internal/pool/conn_check.go +++ b/internal/pool/conn_check.go @@ -52,3 +52,8 @@ func connCheck(conn net.Conn) error { return sysErr } + +// maybeHasData checks if there is data in the socket without consuming it +func maybeHasData(conn net.Conn) bool { + return connCheck(conn) == errUnexpectedRead +} diff --git a/internal/pool/conn_check_dummy.go b/internal/pool/conn_check_dummy.go index 295da126..095bbd1a 100644 --- a/internal/pool/conn_check_dummy.go +++ b/internal/pool/conn_check_dummy.go @@ -7,3 +7,8 @@ import "net" func connCheck(conn net.Conn) error { return nil } + +// since we can't check for data on the socket, we just assume there is some +func maybeHasData(conn net.Conn) bool { + return true +} diff --git a/push/processor.go b/push/processor.go index 2c1b6f5e..278b6fe6 100644 --- a/push/processor.go +++ b/push/processor.go @@ -57,6 +57,7 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx replyType, err := rd.PeekReplyType() if err != nil { // No more data available or error reading + // if timeout, it will be handled by the caller break } @@ -144,6 +145,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCt replyType, err := rd.PeekReplyType() if err != nil { // No more data available or error reading + // if timeout, it will be handled by the caller break } @@ -176,7 +178,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCt func willHandleNotificationInClient(notificationType string) bool { switch notificationType { // Pub/Sub notifications - handled by pub/sub system - case "message", // Regular pub/sub message + case "message", // Regular pub/sub message "pmessage", // Pattern pub/sub message "subscribe", // Subscription confirmation "unsubscribe", // Unsubscription confirmation diff --git a/redis.go b/redis.go index dfba1109..f1f65712 100644 --- a/redis.go +++ b/redis.go @@ -462,8 +462,6 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) } else { // process any pending push notifications before returning the connection to the pool if err := c.processPushNotifications(ctx, cn); err != nil { - // Log the error but don't fail the connection release - // Push notification processing errors shouldn't break normal Redis operations internal.Logger.Printf(ctx, "push: error processing pending notifications before releasing connection: %v", err) } c.connPool.Put(ctx, cn) @@ -531,8 +529,6 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { // Process any pending push notifications before executing the command if err := c.processPushNotifications(ctx, cn); err != nil { - // Log the error but don't fail the command execution - // Push notification processing errors shouldn't break normal Redis operations internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err) } @@ -550,8 +546,6 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error { // To be sure there are no buffered push notifications, we process them before reading the reply if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { - // Log the error but don't fail the command execution - // Push notification processing errors shouldn't break normal Redis operations internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) } return readReplyFunc(rd) @@ -652,9 +646,7 @@ func (c *baseClient) generalProcessPipeline( lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { // Process any pending push notifications before executing the pipeline if err := c.processPushNotifications(ctx, cn); err != nil { - // Log the error but don't fail the pipeline execution - // Push notification processing errors shouldn't break normal Redis operations - internal.Logger.Printf(ctx, "push: error processing pending notifications before pipeline: %v", err) + internal.Logger.Printf(ctx, "push: error processing pending notifications before processing pipeline: %v", err) } var err error canRetry, err = p(ctx, cn, cmds) @@ -671,11 +663,8 @@ func (c *baseClient) pipelineProcessCmds( ctx context.Context, cn *pool.Conn, cmds []Cmder, ) (bool, error) { // Process any pending push notifications before executing the pipeline - // This ensures that cluster topology changes are handled immediately if err := c.processPushNotifications(ctx, cn); err != nil { - // Log the error but don't fail the pipeline execution - // Push notification processing errors shouldn't break normal Redis operations - internal.Logger.Printf(ctx, "push: error processing pending notifications before pipeline: %v", err) + internal.Logger.Printf(ctx, "push: error processing pending notifications before writing pipeline: %v", err) } if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { @@ -699,8 +688,6 @@ func (c *baseClient) pipelineReadCmds(ctx context.Context, cn *pool.Conn, rd *pr for i, cmd := range cmds { // To be sure there are no buffered push notifications, we process them before reading the reply if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { - // Log the error but don't fail the command execution - // Push notification processing errors shouldn't break normal Redis operations internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) } err := cmd.readReply(rd) @@ -718,10 +705,7 @@ func (c *baseClient) txPipelineProcessCmds( ctx context.Context, cn *pool.Conn, cmds []Cmder, ) (bool, error) { // Process any pending push notifications before executing the transaction pipeline - // This ensures that cluster topology changes are handled immediately if err := c.processPushNotifications(ctx, cn); err != nil { - // Log the error but don't fail the transaction execution - // Push notification processing errors shouldn't break normal Redis operations internal.Logger.Printf(ctx, "push: error processing pending notifications before transaction: %v", err) } @@ -756,8 +740,6 @@ func (c *baseClient) txPipelineProcessCmds( func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error { // To be sure there are no buffered push notifications, we process them before reading the reply if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { - // Log the error but don't fail the command execution - // Push notification processing errors shouldn't break normal Redis operations internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) } // Parse +OK. @@ -769,8 +751,6 @@ func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd for range cmds { // To be sure there are no buffered push notifications, we process them before reading the reply if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { - // Log the error but don't fail the command execution - // Push notification processing errors shouldn't break normal Redis operations internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) } if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) { @@ -780,8 +760,6 @@ func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd // To be sure there are no buffered push notifications, we process them before reading the reply if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { - // Log the error but don't fail the command execution - // Push notification processing errors shouldn't break normal Redis operations internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) } // Parse number of replies. @@ -1096,7 +1074,10 @@ func (c *Conn) TxPipeline() Pipeliner { // This method should be called by the client before using WithReader for command execution func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error { // Only process push notifications for RESP3 connections with a processor - if c.opt.Protocol != 3 || c.pushProcessor == nil { + // Also check if there is any data to read before processing + // Which is an optimization on UNIX systems where MaybeHasData is a syscall + // On Windows, MaybeHasData always returns true, so this check is a no-op + if c.opt.Protocol != 3 || c.pushProcessor == nil || !cn.MaybeHasData() { return nil } @@ -1104,7 +1085,7 @@ func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn // This is critical for hitless upgrades to work properly // NOTE: almost no timeouts are set for this read, so it should not block // longer than necessary, 10us should be plenty of time to read if there are any push notifications - // on the socket. Even if it was not enough time, the next read will just read the push notifications again. + // on the socket. return cn.WithReader(ctx, 10*time.Microsecond, func(rd *proto.Reader) error { // Create handler context with client, connection pool, and connection information handlerCtx := c.pushNotificationHandlerContext(cn) @@ -1115,6 +1096,8 @@ func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn // processPendingPushNotificationWithReader processes all pending push notifications on a connection // This method should be called by the client in WithReader before reading the reply func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Context, cn *pool.Conn, rd *proto.Reader) error { + // if we have the reader, we don't need to check for data on the socket, we are waiting + // for either a reply or a push notification, so we can block until we get a reply or reach the timeout if c.opt.Protocol != 3 || c.pushProcessor == nil { return nil }