diff --git a/bench_decode_test.go b/bench_decode_test.go deleted file mode 100644 index d61a901a..00000000 --- a/bench_decode_test.go +++ /dev/null @@ -1,316 +0,0 @@ -package redis - -import ( - "context" - "fmt" - "io" - "net" - "testing" - "time" - - "github.com/redis/go-redis/v9/internal/proto" -) - -var ctx = context.TODO() - -type ClientStub struct { - Cmdable - resp []byte -} - -var initHello = []byte("%1\r\n+proto\r\n:3\r\n") - -func NewClientStub(resp []byte) *ClientStub { - stub := &ClientStub{ - resp: resp, - } - - stub.Cmdable = NewClient(&Options{ - PoolSize: 128, - Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) { - return stub.stubConn(initHello), nil - }, - DisableIdentity: true, - }) - return stub -} - -func NewClusterClientStub(resp []byte) *ClientStub { - stub := &ClientStub{ - resp: resp, - } - - client := NewClusterClient(&ClusterOptions{ - PoolSize: 128, - Addrs: []string{":6379"}, - Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) { - return stub.stubConn(initHello), nil - }, - DisableIdentity: true, - - ClusterSlots: func(_ context.Context) ([]ClusterSlot, error) { - return []ClusterSlot{ - { - Start: 0, - End: 16383, - Nodes: []ClusterNode{{Addr: "127.0.0.1:6379"}}, - }, - }, nil - }, - }) - - stub.Cmdable = client - return stub -} - -func (c *ClientStub) stubConn(init []byte) *ConnStub { - return &ConnStub{ - init: init, - resp: c.resp, - } -} - -type ConnStub struct { - init []byte - resp []byte - pos int -} - -func (c *ConnStub) Read(b []byte) (n int, err error) { - // Return conn.init() - if len(c.init) > 0 { - n = copy(b, c.init) - c.init = c.init[n:] - return n, nil - } - - if len(c.resp) == 0 { - return 0, io.EOF - } - - if c.pos >= len(c.resp) { - c.pos = 0 - } - n = copy(b, c.resp[c.pos:]) - c.pos += n - return n, nil -} - -func (c *ConnStub) Write(b []byte) (n int, err error) { return len(b), nil } -func (c *ConnStub) Close() error { return nil } -func (c *ConnStub) LocalAddr() net.Addr { return nil } -func (c *ConnStub) RemoteAddr() net.Addr { return nil } -func (c *ConnStub) SetDeadline(_ time.Time) error { return nil } -func (c *ConnStub) SetReadDeadline(_ time.Time) error { return nil } -func (c *ConnStub) SetWriteDeadline(_ time.Time) error { return nil } - -type ClientStubFunc func([]byte) *ClientStub - -func BenchmarkDecode(b *testing.B) { - type Benchmark struct { - name string - stub ClientStubFunc - } - - benchmarks := []Benchmark{ - {"server", NewClientStub}, - {"cluster", NewClusterClientStub}, - } - - for _, bench := range benchmarks { - b.Run(fmt.Sprintf("RespError-%s", bench.name), func(b *testing.B) { - respError(b, bench.stub) - }) - b.Run(fmt.Sprintf("RespStatus-%s", bench.name), func(b *testing.B) { - respStatus(b, bench.stub) - }) - b.Run(fmt.Sprintf("RespInt-%s", bench.name), func(b *testing.B) { - respInt(b, bench.stub) - }) - b.Run(fmt.Sprintf("RespString-%s", bench.name), func(b *testing.B) { - respString(b, bench.stub) - }) - b.Run(fmt.Sprintf("RespArray-%s", bench.name), func(b *testing.B) { - respArray(b, bench.stub) - }) - b.Run(fmt.Sprintf("RespPipeline-%s", bench.name), func(b *testing.B) { - respPipeline(b, bench.stub) - }) - b.Run(fmt.Sprintf("RespTxPipeline-%s", bench.name), func(b *testing.B) { - respTxPipeline(b, bench.stub) - }) - - // goroutine - b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=5", bench.name), func(b *testing.B) { - dynamicGoroutine(b, bench.stub, 5) - }) - b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=20", bench.name), func(b *testing.B) { - dynamicGoroutine(b, bench.stub, 20) - }) - b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=50", bench.name), func(b *testing.B) { - dynamicGoroutine(b, bench.stub, 50) - }) - b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=100", bench.name), func(b *testing.B) { - dynamicGoroutine(b, bench.stub, 100) - }) - - b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=5", bench.name), func(b *testing.B) { - staticGoroutine(b, bench.stub, 5) - }) - b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=20", bench.name), func(b *testing.B) { - staticGoroutine(b, bench.stub, 20) - }) - b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=50", bench.name), func(b *testing.B) { - staticGoroutine(b, bench.stub, 50) - }) - b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=100", bench.name), func(b *testing.B) { - staticGoroutine(b, bench.stub, 100) - }) - } -} - -func respError(b *testing.B, stub ClientStubFunc) { - rdb := stub([]byte("-ERR test error\r\n")) - respErr := proto.RedisError("ERR test error") - - b.ResetTimer() - for i := 0; i < b.N; i++ { - if err := rdb.Get(ctx, "key").Err(); err != respErr { - b.Fatalf("response error, got %q, want %q", err, respErr) - } - } -} - -func respStatus(b *testing.B, stub ClientStubFunc) { - rdb := stub([]byte("+OK\r\n")) - var val string - - b.ResetTimer() - for i := 0; i < b.N; i++ { - if val = rdb.Set(ctx, "key", "value", 0).Val(); val != "OK" { - b.Fatalf("response error, got %q, want OK", val) - } - } -} - -func respInt(b *testing.B, stub ClientStubFunc) { - rdb := stub([]byte(":10\r\n")) - var val int64 - - b.ResetTimer() - for i := 0; i < b.N; i++ { - if val = rdb.Incr(ctx, "key").Val(); val != 10 { - b.Fatalf("response error, got %q, want 10", val) - } - } -} - -func respString(b *testing.B, stub ClientStubFunc) { - rdb := stub([]byte("$5\r\nhello\r\n")) - var val string - - b.ResetTimer() - for i := 0; i < b.N; i++ { - if val = rdb.Get(ctx, "key").Val(); val != "hello" { - b.Fatalf("response error, got %q, want hello", val) - } - } -} - -func respArray(b *testing.B, stub ClientStubFunc) { - rdb := stub([]byte("*3\r\n$5\r\nhello\r\n:10\r\n+OK\r\n")) - var val []interface{} - - b.ResetTimer() - for i := 0; i < b.N; i++ { - if val = rdb.MGet(ctx, "key").Val(); len(val) != 3 { - b.Fatalf("response error, got len(%d), want len(3)", len(val)) - } - } -} - -func respPipeline(b *testing.B, stub ClientStubFunc) { - rdb := stub([]byte("+OK\r\n$5\r\nhello\r\n:1\r\n")) - var pipe Pipeliner - - b.ResetTimer() - for i := 0; i < b.N; i++ { - pipe = rdb.Pipeline() - set := pipe.Set(ctx, "key", "value", 0) - get := pipe.Get(ctx, "key") - del := pipe.Del(ctx, "key") - _, err := pipe.Exec(ctx) - if err != nil { - b.Fatalf("response error, got %q, want nil", err) - } - if set.Val() != "OK" || get.Val() != "hello" || del.Val() != 1 { - b.Fatal("response error") - } - } -} - -func respTxPipeline(b *testing.B, stub ClientStubFunc) { - rdb := stub([]byte("+OK\r\n+QUEUED\r\n+QUEUED\r\n+QUEUED\r\n*3\r\n+OK\r\n$5\r\nhello\r\n:1\r\n")) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - var set *StatusCmd - var get *StringCmd - var del *IntCmd - _, err := rdb.TxPipelined(ctx, func(pipe Pipeliner) error { - set = pipe.Set(ctx, "key", "value", 0) - get = pipe.Get(ctx, "key") - del = pipe.Del(ctx, "key") - return nil - }) - if err != nil { - b.Fatalf("response error, got %q, want nil", err) - } - if set.Val() != "OK" || get.Val() != "hello" || del.Val() != 1 { - b.Fatal("response error") - } - } -} - -func dynamicGoroutine(b *testing.B, stub ClientStubFunc, concurrency int) { - rdb := stub([]byte("$5\r\nhello\r\n")) - c := make(chan struct{}, concurrency) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - c <- struct{}{} - go func() { - if val := rdb.Get(ctx, "key").Val(); val != "hello" { - panic(fmt.Sprintf("response error, got %q, want hello", val)) - } - <-c - }() - } - // Here no longer wait for all goroutines to complete, it will not affect the test results. - close(c) -} - -func staticGoroutine(b *testing.B, stub ClientStubFunc, concurrency int) { - rdb := stub([]byte("$5\r\nhello\r\n")) - c := make(chan struct{}, concurrency) - - b.ResetTimer() - - for i := 0; i < concurrency; i++ { - go func() { - for { - _, ok := <-c - if !ok { - return - } - if val := rdb.Get(ctx, "key").Val(); val != "hello" { - panic(fmt.Sprintf("response error, got %q, want hello", val)) - } - } - }() - } - for i := 0; i < b.N; i++ { - c <- struct{}{} - } - close(c) -} diff --git a/push/processor.go b/push/processor.go index 24bca662..433a546b 100644 --- a/push/processor.go +++ b/push/processor.go @@ -204,7 +204,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCt func willHandleNotificationInClient(notificationType string) bool { switch notificationType { // Pub/Sub notifications - handled by pub/sub system - case "message", // Regular pub/sub message + case "message", // Regular pub/sub message "pmessage", // Pattern pub/sub message "subscribe", // Subscription confirmation "unsubscribe", // Unsubscription confirmation diff --git a/push/push_test.go b/push/push_test.go index b25febb0..30352460 100644 --- a/push/push_test.go +++ b/push/push_test.go @@ -5,8 +5,10 @@ import ( "context" "errors" "fmt" + "net" "strings" "testing" + "time" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" @@ -26,6 +28,18 @@ func NewTestHandler(name string) *TestHandler { } } +// MockNetConn implements net.Conn for testing +type MockNetConn struct{} + +func (m *MockNetConn) Read(b []byte) (n int, err error) { return 0, nil } +func (m *MockNetConn) Write(b []byte) (n int, err error) { return len(b), nil } +func (m *MockNetConn) Close() error { return nil } +func (m *MockNetConn) LocalAddr() net.Addr { return nil } +func (m *MockNetConn) RemoteAddr() net.Addr { return nil } +func (m *MockNetConn) SetDeadline(t time.Time) error { return nil } +func (m *MockNetConn) SetReadDeadline(t time.Time) error { return nil } +func (m *MockNetConn) SetWriteDeadline(t time.Time) error { return nil } + func (h *TestHandler) HandlePushNotification(ctx context.Context, handlerCtx NotificationHandlerContext, notification []interface{}) error { h.handled = append(h.handled, notification) return h.returnError @@ -843,6 +857,12 @@ func createReaderWithPrimedBuffer(buf *bytes.Buffer) *proto.Reader { return reader } +// createMockConnection creates a mock connection for testing +func createMockConnection() *pool.Conn { + mockNetConn := &MockNetConn{} + return pool.NewConn(mockNetConn) +} + // createFakeRESP3Array creates a fake RESP3 array (not push notification) func createFakeRESP3Array(elements ...string) *bytes.Buffer { buf := &bytes.Buffer{} @@ -908,7 +928,7 @@ func TestProcessorWithFakeBuffer(t *testing.T) { Client: nil, ConnPool: nil, PubSub: nil, - Conn: nil, + Conn: createMockConnection(), IsBlocking: false, } @@ -920,13 +940,14 @@ func TestProcessorWithFakeBuffer(t *testing.T) { handled := handler.GetHandledNotifications() if len(handled) != 1 { t.Errorf("Expected 1 handled notification, got %d", len(handled)) + return // Prevent panic if no notifications were handled } if len(handled[0]) != 7 || handled[0][0] != "MOVING" { t.Errorf("Handled notification should match input: %v", handled[0]) } - if handled[0][1] != "slot" || handled[0][2] != "123" { + if len(handled[0]) > 2 && (handled[0][1] != "slot" || handled[0][2] != "123") { t.Errorf("Notification arguments should match: %v", handled[0]) } }) @@ -945,7 +966,7 @@ func TestProcessorWithFakeBuffer(t *testing.T) { Client: nil, ConnPool: nil, PubSub: nil, - Conn: nil, + Conn: createMockConnection(), IsBlocking: false, } @@ -973,7 +994,7 @@ func TestProcessorWithFakeBuffer(t *testing.T) { Client: nil, ConnPool: nil, PubSub: nil, - Conn: nil, + Conn: createMockConnection(), IsBlocking: false, } @@ -998,7 +1019,7 @@ func TestProcessorWithFakeBuffer(t *testing.T) { Client: nil, ConnPool: nil, PubSub: nil, - Conn: nil, + Conn: createMockConnection(), IsBlocking: false, } @@ -1027,7 +1048,7 @@ func TestProcessorWithFakeBuffer(t *testing.T) { Client: nil, ConnPool: nil, PubSub: nil, - Conn: nil, + Conn: createMockConnection(), IsBlocking: false, } @@ -1061,7 +1082,7 @@ func TestProcessorWithFakeBuffer(t *testing.T) { Client: nil, ConnPool: nil, PubSub: nil, - Conn: nil, + Conn: createMockConnection(), IsBlocking: false, } @@ -1104,7 +1125,7 @@ func TestProcessorWithFakeBuffer(t *testing.T) { Client: nil, ConnPool: nil, PubSub: nil, - Conn: nil, + Conn: createMockConnection(), IsBlocking: false, } @@ -1143,7 +1164,7 @@ func TestProcessorWithFakeBuffer(t *testing.T) { Client: nil, ConnPool: nil, PubSub: nil, - Conn: nil, + Conn: createMockConnection(), IsBlocking: false, } @@ -1390,7 +1411,7 @@ func TestProcessorPerformanceWithFakeData(t *testing.T) { Client: nil, ConnPool: nil, PubSub: nil, - Conn: nil, + Conn: createMockConnection(), IsBlocking: false, } diff --git a/redis.go b/redis.go index f0d6fb17..43673863 100644 --- a/redis.go +++ b/redis.go @@ -1102,7 +1102,8 @@ func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn // Use WithReader to access the reader and process push notifications // This is critical for hitless upgrades to work properly - return cn.WithReader(ctx, 0, func(rd *proto.Reader) error { + // NOTE: almost no timeouts are set for this read, so it should not block + return cn.WithReader(ctx, 1, func(rd *proto.Reader) error { // Create handler context with client, connection pool, and connection information handlerCtx := c.pushNotificationHandlerContext(cn) return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)