mirror of
https://github.com/redis/go-redis.git
synced 2025-09-02 22:01:16 +03:00
feat(push): reading optimization for Linux
Optimize the peeking on newly acquired connection on *unix. Use syscall to peek on the socket instead of blocking for a fixed amount of time. This won't work on Windows, hence the `MaybeHasData` will always return true on Windows and the client will have to block for a given time to actually peek on the socket. *Time to complete N HSET operations (individual commands)* | Batch Size | Before (total sec) | After (total sec) | Time Saved | % Faster | |------------|-------------------|------------------|------------|----------| | 100 ops | 0.0172 | 0.0133 | 0.0038 | **22.4%** | | 1K ops | 0.178 | 0.133 | 0.045 | **25.3%** | | 10K ops | 1.72 | 1.28 | 0.44 | **25.6%** | | 100K ops | 17.1 | 13.4 | 3.7 | **22.0%** |
This commit is contained in:
245
hset_benchmark_test.go
Normal file
245
hset_benchmark_test.go
Normal file
@@ -0,0 +1,245 @@
|
||||
package redis_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// HSET Benchmark Tests
|
||||
//
|
||||
// This file contains benchmark tests for Redis HSET operations with different scales:
|
||||
// 1, 10, 100, 1000, 10000, 100000 operations
|
||||
//
|
||||
// Prerequisites:
|
||||
// - Redis server running on localhost:6379
|
||||
// - No authentication required
|
||||
//
|
||||
// Usage:
|
||||
// go test -bench=BenchmarkHSET -v ./hset_benchmark_test.go
|
||||
// go test -bench=BenchmarkHSETPipelined -v ./hset_benchmark_test.go
|
||||
// go test -bench=. -v ./hset_benchmark_test.go # Run all benchmarks
|
||||
//
|
||||
// Example output:
|
||||
// BenchmarkHSET/HSET_1_operations-8 5000 250000 ns/op 1000000.00 ops/sec
|
||||
// BenchmarkHSET/HSET_100_operations-8 100 10000000 ns/op 100000.00 ops/sec
|
||||
//
|
||||
// The benchmarks test three different approaches:
|
||||
// 1. Individual HSET commands (BenchmarkHSET)
|
||||
// 2. Pipelined HSET commands (BenchmarkHSETPipelined)
|
||||
|
||||
// BenchmarkHSET benchmarks HSET operations with different scales
|
||||
func BenchmarkHSET(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup Redis client
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
DB: 0,
|
||||
})
|
||||
defer rdb.Close()
|
||||
|
||||
// Test connection
|
||||
if err := rdb.Ping(ctx).Err(); err != nil {
|
||||
b.Skipf("Redis server not available: %v", err)
|
||||
}
|
||||
|
||||
// Clean up before and after tests
|
||||
defer func() {
|
||||
rdb.FlushDB(ctx)
|
||||
}()
|
||||
|
||||
scales := []int{1, 10, 100, 1000, 10000, 100000}
|
||||
|
||||
for _, scale := range scales {
|
||||
b.Run(fmt.Sprintf("HSET_%d_operations", scale), func(b *testing.B) {
|
||||
benchmarkHSETOperations(b, rdb, ctx, scale)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// benchmarkHSETOperations performs the actual HSET benchmark for a given scale
|
||||
func benchmarkHSETOperations(b *testing.B, rdb *redis.Client, ctx context.Context, operations int) {
|
||||
hashKey := fmt.Sprintf("benchmark_hash_%d", operations)
|
||||
|
||||
b.ResetTimer()
|
||||
b.StartTimer()
|
||||
totalTimes := []time.Duration{}
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
// Clean up the hash before each iteration
|
||||
rdb.Del(ctx, hashKey)
|
||||
b.StartTimer()
|
||||
|
||||
startTime := time.Now()
|
||||
// Perform the specified number of HSET operations
|
||||
for j := 0; j < operations; j++ {
|
||||
field := fmt.Sprintf("field_%d", j)
|
||||
value := fmt.Sprintf("value_%d", j)
|
||||
|
||||
err := rdb.HSet(ctx, hashKey, field, value).Err()
|
||||
if err != nil {
|
||||
b.Fatalf("HSET operation failed: %v", err)
|
||||
}
|
||||
}
|
||||
totalTimes = append(totalTimes, time.Now().Sub(startTime))
|
||||
}
|
||||
|
||||
// Stop the timer to calculate metrics
|
||||
b.StopTimer()
|
||||
|
||||
// Report operations per second
|
||||
opsPerSec := float64(operations*b.N) / b.Elapsed().Seconds()
|
||||
b.ReportMetric(opsPerSec, "ops/sec")
|
||||
|
||||
// Report average time per operation
|
||||
avgTimePerOp := b.Elapsed().Nanoseconds() / int64(operations*b.N)
|
||||
b.ReportMetric(float64(avgTimePerOp), "ns/op")
|
||||
// report average time in milliseconds from totalTimes
|
||||
avgTimePerOpMs := totalTimes[0].Milliseconds() / int64(len(totalTimes))
|
||||
b.ReportMetric(float64(avgTimePerOpMs), "ms")
|
||||
}
|
||||
|
||||
// BenchmarkHSETPipelined benchmarks HSET operations using pipelining for better performance
|
||||
func BenchmarkHSETPipelined(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup Redis client
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
DB: 0,
|
||||
})
|
||||
defer rdb.Close()
|
||||
|
||||
// Test connection
|
||||
if err := rdb.Ping(ctx).Err(); err != nil {
|
||||
b.Skipf("Redis server not available: %v", err)
|
||||
}
|
||||
|
||||
// Clean up before and after tests
|
||||
defer func() {
|
||||
rdb.FlushDB(ctx)
|
||||
}()
|
||||
|
||||
scales := []int{1, 10, 100, 1000, 10000, 100000}
|
||||
|
||||
for _, scale := range scales {
|
||||
b.Run(fmt.Sprintf("HSET_Pipelined_%d_operations", scale), func(b *testing.B) {
|
||||
benchmarkHSETPipelined(b, rdb, ctx, scale)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// benchmarkHSETPipelined performs HSET benchmark using pipelining
|
||||
func benchmarkHSETPipelined(b *testing.B, rdb *redis.Client, ctx context.Context, operations int) {
|
||||
hashKey := fmt.Sprintf("benchmark_hash_pipelined_%d", operations)
|
||||
|
||||
b.ResetTimer()
|
||||
b.StartTimer()
|
||||
totalTimes := []time.Duration{}
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
// Clean up the hash before each iteration
|
||||
rdb.Del(ctx, hashKey)
|
||||
b.StartTimer()
|
||||
|
||||
startTime := time.Now()
|
||||
// Use pipelining for better performance
|
||||
pipe := rdb.Pipeline()
|
||||
|
||||
// Add all HSET operations to the pipeline
|
||||
for j := 0; j < operations; j++ {
|
||||
field := fmt.Sprintf("field_%d", j)
|
||||
value := fmt.Sprintf("value_%d", j)
|
||||
pipe.HSet(ctx, hashKey, field, value)
|
||||
}
|
||||
|
||||
// Execute all operations at once
|
||||
_, err := pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
b.Fatalf("Pipeline execution failed: %v", err)
|
||||
}
|
||||
totalTimes = append(totalTimes, time.Now().Sub(startTime))
|
||||
}
|
||||
|
||||
b.StopTimer()
|
||||
|
||||
// Report operations per second
|
||||
opsPerSec := float64(operations*b.N) / b.Elapsed().Seconds()
|
||||
b.ReportMetric(opsPerSec, "ops/sec")
|
||||
|
||||
// Report average time per operation
|
||||
avgTimePerOp := b.Elapsed().Nanoseconds() / int64(operations*b.N)
|
||||
b.ReportMetric(float64(avgTimePerOp), "ns/op")
|
||||
// report average time in milliseconds from totalTimes
|
||||
avgTimePerOpMs := totalTimes[0].Milliseconds() / int64(len(totalTimes))
|
||||
b.ReportMetric(float64(avgTimePerOpMs), "ms")
|
||||
}
|
||||
|
||||
// add same tests but with RESP2
|
||||
func BenchmarkHSET_RESP2(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup Redis client
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
Password: "", // no password docs
|
||||
DB: 0, // use default DB
|
||||
Protocol: 2,
|
||||
})
|
||||
defer rdb.Close()
|
||||
|
||||
// Test connection
|
||||
if err := rdb.Ping(ctx).Err(); err != nil {
|
||||
b.Skipf("Redis server not available: %v", err)
|
||||
}
|
||||
|
||||
// Clean up before and after tests
|
||||
defer func() {
|
||||
rdb.FlushDB(ctx)
|
||||
}()
|
||||
|
||||
scales := []int{1, 10, 100, 1000, 10000, 100000}
|
||||
|
||||
for _, scale := range scales {
|
||||
b.Run(fmt.Sprintf("HSET_RESP2_%d_operations", scale), func(b *testing.B) {
|
||||
benchmarkHSETOperations(b, rdb, ctx, scale)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkHSETPipelined_RESP2(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup Redis client
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
Password: "", // no password docs
|
||||
DB: 0, // use default DB
|
||||
Protocol: 2,
|
||||
})
|
||||
defer rdb.Close()
|
||||
|
||||
// Test connection
|
||||
if err := rdb.Ping(ctx).Err(); err != nil {
|
||||
b.Skipf("Redis server not available: %v", err)
|
||||
}
|
||||
|
||||
// Clean up before and after tests
|
||||
defer func() {
|
||||
rdb.FlushDB(ctx)
|
||||
}()
|
||||
|
||||
scales := []int{1, 10, 100, 1000, 10000, 100000}
|
||||
|
||||
for _, scale := range scales {
|
||||
b.Run(fmt.Sprintf("HSET_Pipelined_RESP2_%d_operations", scale), func(b *testing.B) {
|
||||
benchmarkHSETPipelined(b, rdb, ctx, scale)
|
||||
})
|
||||
}
|
||||
}
|
@@ -113,6 +113,13 @@ func (cn *Conn) Close() error {
|
||||
return cn.netConn.Close()
|
||||
}
|
||||
|
||||
// MaybeHasData tries to peek at the next byte in the socket without consuming it
|
||||
// This is used to check if there are push notifications available
|
||||
// Important: This will work on Linux, but not on Windows
|
||||
func (cn *Conn) MaybeHasData() bool {
|
||||
return maybeHasData(cn.netConn)
|
||||
}
|
||||
|
||||
func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
|
||||
tm := time.Now()
|
||||
cn.SetUsedAt(tm)
|
||||
|
@@ -52,3 +52,8 @@ func connCheck(conn net.Conn) error {
|
||||
|
||||
return sysErr
|
||||
}
|
||||
|
||||
// maybeHasData checks if there is data in the socket without consuming it
|
||||
func maybeHasData(conn net.Conn) bool {
|
||||
return connCheck(conn) == errUnexpectedRead
|
||||
}
|
||||
|
@@ -7,3 +7,8 @@ import "net"
|
||||
func connCheck(conn net.Conn) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// since we can't check for data on the socket, we just assume there is some
|
||||
func maybeHasData(conn net.Conn) bool {
|
||||
return true
|
||||
}
|
||||
|
@@ -57,6 +57,7 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
|
||||
replyType, err := rd.PeekReplyType()
|
||||
if err != nil {
|
||||
// No more data available or error reading
|
||||
// if timeout, it will be handled by the caller
|
||||
break
|
||||
}
|
||||
|
||||
@@ -144,6 +145,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCt
|
||||
replyType, err := rd.PeekReplyType()
|
||||
if err != nil {
|
||||
// No more data available or error reading
|
||||
// if timeout, it will be handled by the caller
|
||||
break
|
||||
}
|
||||
|
||||
@@ -176,7 +178,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCt
|
||||
func willHandleNotificationInClient(notificationType string) bool {
|
||||
switch notificationType {
|
||||
// Pub/Sub notifications - handled by pub/sub system
|
||||
case "message", // Regular pub/sub message
|
||||
case "message", // Regular pub/sub message
|
||||
"pmessage", // Pattern pub/sub message
|
||||
"subscribe", // Subscription confirmation
|
||||
"unsubscribe", // Unsubscription confirmation
|
||||
|
35
redis.go
35
redis.go
@@ -462,8 +462,6 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
|
||||
} else {
|
||||
// process any pending push notifications before returning the connection to the pool
|
||||
if err := c.processPushNotifications(ctx, cn); err != nil {
|
||||
// Log the error but don't fail the connection release
|
||||
// Push notification processing errors shouldn't break normal Redis operations
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before releasing connection: %v", err)
|
||||
}
|
||||
c.connPool.Put(ctx, cn)
|
||||
@@ -531,8 +529,6 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
||||
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
||||
// Process any pending push notifications before executing the command
|
||||
if err := c.processPushNotifications(ctx, cn); err != nil {
|
||||
// Log the error but don't fail the command execution
|
||||
// Push notification processing errors shouldn't break normal Redis operations
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err)
|
||||
}
|
||||
|
||||
@@ -550,8 +546,6 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
||||
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
|
||||
// To be sure there are no buffered push notifications, we process them before reading the reply
|
||||
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
|
||||
// Log the error but don't fail the command execution
|
||||
// Push notification processing errors shouldn't break normal Redis operations
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
|
||||
}
|
||||
return readReplyFunc(rd)
|
||||
@@ -652,9 +646,7 @@ func (c *baseClient) generalProcessPipeline(
|
||||
lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
||||
// Process any pending push notifications before executing the pipeline
|
||||
if err := c.processPushNotifications(ctx, cn); err != nil {
|
||||
// Log the error but don't fail the pipeline execution
|
||||
// Push notification processing errors shouldn't break normal Redis operations
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before pipeline: %v", err)
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before processing pipeline: %v", err)
|
||||
}
|
||||
var err error
|
||||
canRetry, err = p(ctx, cn, cmds)
|
||||
@@ -671,11 +663,8 @@ func (c *baseClient) pipelineProcessCmds(
|
||||
ctx context.Context, cn *pool.Conn, cmds []Cmder,
|
||||
) (bool, error) {
|
||||
// Process any pending push notifications before executing the pipeline
|
||||
// This ensures that cluster topology changes are handled immediately
|
||||
if err := c.processPushNotifications(ctx, cn); err != nil {
|
||||
// Log the error but don't fail the pipeline execution
|
||||
// Push notification processing errors shouldn't break normal Redis operations
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before pipeline: %v", err)
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before writing pipeline: %v", err)
|
||||
}
|
||||
|
||||
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
@@ -699,8 +688,6 @@ func (c *baseClient) pipelineReadCmds(ctx context.Context, cn *pool.Conn, rd *pr
|
||||
for i, cmd := range cmds {
|
||||
// To be sure there are no buffered push notifications, we process them before reading the reply
|
||||
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
|
||||
// Log the error but don't fail the command execution
|
||||
// Push notification processing errors shouldn't break normal Redis operations
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
|
||||
}
|
||||
err := cmd.readReply(rd)
|
||||
@@ -718,10 +705,7 @@ func (c *baseClient) txPipelineProcessCmds(
|
||||
ctx context.Context, cn *pool.Conn, cmds []Cmder,
|
||||
) (bool, error) {
|
||||
// Process any pending push notifications before executing the transaction pipeline
|
||||
// This ensures that cluster topology changes are handled immediately
|
||||
if err := c.processPushNotifications(ctx, cn); err != nil {
|
||||
// Log the error but don't fail the transaction execution
|
||||
// Push notification processing errors shouldn't break normal Redis operations
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before transaction: %v", err)
|
||||
}
|
||||
|
||||
@@ -756,8 +740,6 @@ func (c *baseClient) txPipelineProcessCmds(
|
||||
func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
|
||||
// To be sure there are no buffered push notifications, we process them before reading the reply
|
||||
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
|
||||
// Log the error but don't fail the command execution
|
||||
// Push notification processing errors shouldn't break normal Redis operations
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
|
||||
}
|
||||
// Parse +OK.
|
||||
@@ -769,8 +751,6 @@ func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd
|
||||
for range cmds {
|
||||
// To be sure there are no buffered push notifications, we process them before reading the reply
|
||||
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
|
||||
// Log the error but don't fail the command execution
|
||||
// Push notification processing errors shouldn't break normal Redis operations
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
|
||||
}
|
||||
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) {
|
||||
@@ -780,8 +760,6 @@ func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd
|
||||
|
||||
// To be sure there are no buffered push notifications, we process them before reading the reply
|
||||
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
|
||||
// Log the error but don't fail the command execution
|
||||
// Push notification processing errors shouldn't break normal Redis operations
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
|
||||
}
|
||||
// Parse number of replies.
|
||||
@@ -1096,7 +1074,10 @@ func (c *Conn) TxPipeline() Pipeliner {
|
||||
// This method should be called by the client before using WithReader for command execution
|
||||
func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error {
|
||||
// Only process push notifications for RESP3 connections with a processor
|
||||
if c.opt.Protocol != 3 || c.pushProcessor == nil {
|
||||
// Also check if there is any data to read before processing
|
||||
// Which is an optimization on UNIX systems where MaybeHasData is a syscall
|
||||
// On Windows, MaybeHasData always returns true, so this check is a no-op
|
||||
if c.opt.Protocol != 3 || c.pushProcessor == nil || !cn.MaybeHasData() {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1104,7 +1085,7 @@ func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn
|
||||
// This is critical for hitless upgrades to work properly
|
||||
// NOTE: almost no timeouts are set for this read, so it should not block
|
||||
// longer than necessary, 10us should be plenty of time to read if there are any push notifications
|
||||
// on the socket. Even if it was not enough time, the next read will just read the push notifications again.
|
||||
// on the socket.
|
||||
return cn.WithReader(ctx, 10*time.Microsecond, func(rd *proto.Reader) error {
|
||||
// Create handler context with client, connection pool, and connection information
|
||||
handlerCtx := c.pushNotificationHandlerContext(cn)
|
||||
@@ -1115,6 +1096,8 @@ func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn
|
||||
// processPendingPushNotificationWithReader processes all pending push notifications on a connection
|
||||
// This method should be called by the client in WithReader before reading the reply
|
||||
func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Context, cn *pool.Conn, rd *proto.Reader) error {
|
||||
// if we have the reader, we don't need to check for data on the socket, we are waiting
|
||||
// for either a reply or a push notification, so we can block until we get a reply or reach the timeout
|
||||
if c.opt.Protocol != 3 || c.pushProcessor == nil {
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user