diff --git a/.github/actions/run-tests/action.yml b/.github/actions/run-tests/action.yml index 0d6db09b..916e734a 100644 --- a/.github/actions/run-tests/action.yml +++ b/.github/actions/run-tests/action.yml @@ -24,7 +24,7 @@ runs: # Mapping of redis version to redis testing containers declare -A redis_version_mapping=( - ["8.4.x"]="8.4-RC1-pre" + ["8.4.x"]="8.4-RC1-pre.2" ["8.2.x"]="8.2.1-pre" ["8.0.x"]="8.0.2" ) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index fa4ba024..4ffd5683 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -44,7 +44,7 @@ jobs: # Mapping of redis version to redis testing containers declare -A redis_version_mapping=( - ["8.4.x"]="8.4-RC1-pre" + ["8.4.x"]="8.4-RC1-pre.2" ["8.2.x"]="8.2.1-pre" ["8.0.x"]="8.0.2" ) diff --git a/Makefile b/Makefile index 36902ec9..1c630e42 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ GO_MOD_DIRS := $(shell find . -type f -name 'go.mod' -exec dirname {} \; | sort) REDIS_VERSION ?= 8.4 RE_CLUSTER ?= false RCE_DOCKER ?= true -CLIENT_LIBS_TEST_IMAGE ?= redislabs/client-libs-test:8.4-RC1-pre +CLIENT_LIBS_TEST_IMAGE ?= redislabs/client-libs-test:8.4-RC1-pre.2 docker.start: export RE_CLUSTER=$(RE_CLUSTER) && \ diff --git a/acl_commands.go b/acl_commands.go index 9cb800bb..a7b5dd24 100644 --- a/acl_commands.go +++ b/acl_commands.go @@ -8,8 +8,12 @@ type ACLCmdable interface { ACLLog(ctx context.Context, count int64) *ACLLogCmd ACLLogReset(ctx context.Context) *StatusCmd + ACLGenPass(ctx context.Context, bit int) *StringCmd + ACLSetUser(ctx context.Context, username string, rules ...string) *StatusCmd ACLDelUser(ctx context.Context, username string) *IntCmd + ACLUsers(ctx context.Context) *StringSliceCmd + ACLWhoAmI(ctx context.Context) *StringCmd ACLList(ctx context.Context) *StringSliceCmd ACLCat(ctx context.Context) *StringSliceCmd @@ -65,6 +69,24 @@ func (c cmdable) ACLSetUser(ctx context.Context, username string, rules ...strin return cmd } +func (c cmdable) ACLGenPass(ctx context.Context, bit int) *StringCmd { + cmd := NewStringCmd(ctx, "acl", "genpass") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) ACLUsers(ctx context.Context) *StringSliceCmd { + cmd := NewStringSliceCmd(ctx, "acl", "users") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) ACLWhoAmI(ctx context.Context) *StringCmd { + cmd := NewStringCmd(ctx, "acl", "whoami") + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) ACLList(ctx context.Context) *StringSliceCmd { cmd := NewStringSliceCmd(ctx, "acl", "list") _ = c(ctx, cmd) diff --git a/acl_commands_test.go b/acl_commands_test.go index a96621db..e6a99ec5 100644 --- a/acl_commands_test.go +++ b/acl_commands_test.go @@ -92,6 +92,21 @@ var _ = Describe("ACL user commands", Label("NonRedisEnterprise"), func() { Expect(err).NotTo(HaveOccurred()) Expect(res).To(HaveLen(1)) Expect(res[0]).To(ContainSubstring("default")) + + res, err = client.ACLUsers(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(1)) + Expect(res[0]).To(Equal("default")) + + res1, err := client.ACLWhoAmI(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res1).To(Equal("default")) + }) + + It("gen password", func() { + password, err := client.ACLGenPass(ctx, 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(password).NotTo(BeEmpty()) }) It("setuser and deluser", func() { diff --git a/command.go b/command.go index 66917607..acf4e3df 100644 --- a/command.go +++ b/command.go @@ -1607,6 +1607,12 @@ func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error { type XMessage struct { ID string Values map[string]interface{} + // MillisElapsedFromDelivery is the number of milliseconds since the entry was last delivered. + // Only populated when using XREADGROUP with CLAIM argument for claimed entries. + MillisElapsedFromDelivery int64 + // DeliveredCount is the number of times the entry was delivered. + // Only populated when using XREADGROUP with CLAIM argument for claimed entries. + DeliveredCount int64 } type XMessageSliceCmd struct { @@ -1663,10 +1669,16 @@ func readXMessageSlice(rd *proto.Reader) ([]XMessage, error) { } func readXMessage(rd *proto.Reader) (XMessage, error) { - if err := rd.ReadFixedArrayLen(2); err != nil { + // Read array length can be 2 or 4 (with CLAIM metadata) + n, err := rd.ReadArrayLen() + if err != nil { return XMessage{}, err } + if n != 2 && n != 4 { + return XMessage{}, fmt.Errorf("redis: got %d elements in the XMessage array, expected 2 or 4", n) + } + id, err := rd.ReadString() if err != nil { return XMessage{}, err @@ -1679,10 +1691,24 @@ func readXMessage(rd *proto.Reader) (XMessage, error) { } } - return XMessage{ + msg := XMessage{ ID: id, Values: v, - }, nil + } + + if n == 4 { + msg.MillisElapsedFromDelivery, err = rd.ReadInt() + if err != nil { + return XMessage{}, err + } + + msg.DeliveredCount, err = rd.ReadInt() + if err != nil { + return XMessage{}, err + } + } + + return msg, nil } func stringInterfaceMapParser(rd *proto.Reader) (map[string]interface{}, error) { diff --git a/commands_test.go b/commands_test.go index 17b4dd03..c72bd732 100644 --- a/commands_test.go +++ b/commands_test.go @@ -199,6 +199,29 @@ var _ = Describe("Commands", func() { Expect(r.Val()).To(Equal(int64(0))) }) + It("should ClientKillByFilter with kill myself", func() { + opt := redisOptions() + opt.ClientName = "killmyid" + db := redis.NewClient(opt) + Expect(db.Ping(ctx).Err()).NotTo(HaveOccurred()) + + defer func() { + Expect(db.Close()).NotTo(HaveOccurred()) + }() + val, err := client.ClientList(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).Should(ContainSubstring("name=killmyid")) + + myid := db.ClientID(ctx).Val() + killed := client.ClientKillByFilter(ctx, "ID", strconv.FormatInt(myid, 10)) + Expect(killed.Err()).NotTo(HaveOccurred()) + Expect(killed.Val()).To(BeNumerically("==", 1)) + + val, err = client.ClientList(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).ShouldNot(ContainSubstring("name=killmyid")) + }) + It("should ClientKillByFilter with MAXAGE", Label("NonRedisEnterprise"), func() { SkipBeforeRedisVersion(7.4, "doesn't work with older redis stack images") var s []string @@ -1912,6 +1935,137 @@ var _ = Describe("Commands", func() { Expect(mSetNX.Val()).To(Equal(true)) }) + It("should MSetEX", func() { + SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4") + args := redis.MSetEXArgs{ + Expiration: &redis.ExpirationOption{ + Mode: redis.EX, + Value: 1, + }, + } + mSetEX := client.MSetEX(ctx, args, "key1", "hello1", "key2", "hello2") + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(1))) + + // Verify keys were set + val1 := client.Get(ctx, "key1") + Expect(val1.Err()).NotTo(HaveOccurred()) + Expect(val1.Val()).To(Equal("hello1")) + + val2 := client.Get(ctx, "key2") + Expect(val2.Err()).NotTo(HaveOccurred()) + Expect(val2.Val()).To(Equal("hello2")) + + // Verify TTL was set + ttl1 := client.TTL(ctx, "key1") + Expect(ttl1.Err()).NotTo(HaveOccurred()) + Expect(ttl1.Val()).To(BeNumerically(">", 0)) + Expect(ttl1.Val()).To(BeNumerically("<=", 1*time.Second)) + + ttl2 := client.TTL(ctx, "key2") + Expect(ttl2.Err()).NotTo(HaveOccurred()) + Expect(ttl2.Val()).To(BeNumerically(">", 0)) + Expect(ttl2.Val()).To(BeNumerically("<=", 1*time.Second)) + }) + + It("should MSetEX with NX mode", func() { + SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4") + + client.Set(ctx, "key1", "existing", 0) + + // Try to set with NX mode - should fail because key1 exists + args := redis.MSetEXArgs{ + Condition: redis.NX, + Expiration: &redis.ExpirationOption{ + Mode: redis.EX, + Value: 1, + }, + } + mSetEX := client.MSetEX(ctx, args, "key1", "new1", "key2", "new2") + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(0))) + + val1 := client.Get(ctx, "key1") + Expect(val1.Err()).NotTo(HaveOccurred()) + Expect(val1.Val()).To(Equal("existing")) + + val2 := client.Get(ctx, "key2") + Expect(val2.Err()).To(Equal(redis.Nil)) + + client.Del(ctx, "key1") + + // Now try with NX mode when keys don't exist - should succeed + mSetEX = client.MSetEX(ctx, args, "key1", "new1", "key2", "new2") + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(1))) + + val1 = client.Get(ctx, "key1") + Expect(val1.Err()).NotTo(HaveOccurred()) + Expect(val1.Val()).To(Equal("new1")) + + val2 = client.Get(ctx, "key2") + Expect(val2.Err()).NotTo(HaveOccurred()) + Expect(val2.Val()).To(Equal("new2")) + }) + + It("should MSetEX with XX mode", func() { + SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4") + + args := redis.MSetEXArgs{ + Condition: redis.XX, + Expiration: &redis.ExpirationOption{ + Mode: redis.EX, + Value: 1, + }, + } + mSetEX := client.MSetEX(ctx, args, "key1", "new1", "key2", "new2") + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(0))) + + client.Set(ctx, "key1", "existing1", 0) + client.Set(ctx, "key2", "existing2", 0) + + mSetEX = client.MSetEX(ctx, args, "key1", "new1", "key2", "new2") + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(1))) + + val1 := client.Get(ctx, "key1") + Expect(val1.Err()).NotTo(HaveOccurred()) + Expect(val1.Val()).To(Equal("new1")) + + val2 := client.Get(ctx, "key2") + Expect(val2.Err()).NotTo(HaveOccurred()) + Expect(val2.Val()).To(Equal("new2")) + + ttl1 := client.TTL(ctx, "key1") + Expect(ttl1.Err()).NotTo(HaveOccurred()) + Expect(ttl1.Val()).To(BeNumerically(">", 0)) + }) + + It("should MSetEX with map", func() { + SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4") + args := redis.MSetEXArgs{ + Expiration: &redis.ExpirationOption{ + Mode: redis.EX, + Value: 1, + }, + } + mSetEX := client.MSetEX(ctx, args, map[string]interface{}{ + "key1": "value1", + "key2": "value2", + }) + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(1))) + + val1 := client.Get(ctx, "key1") + Expect(val1.Err()).NotTo(HaveOccurred()) + Expect(val1.Val()).To(Equal("value1")) + + val2 := client.Get(ctx, "key2") + Expect(val2.Err()).NotTo(HaveOccurred()) + Expect(val2.Val()).To(Equal("value2")) + }) + It("should SetWithArgs with TTL", func() { args := redis.SetArgs{ TTL: 500 * time.Millisecond, @@ -6749,6 +6903,242 @@ var _ = Describe("Commands", func() { Expect(err).NotTo(HaveOccurred()) Expect(n).To(Equal(int64(2))) }) + + It("should XReadGroup with CLAIM argument", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer2", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(1)) + Expect(res[0].Stream).To(Equal("stream")) + + messages := res[0].Messages + Expect(len(messages)).To(BeNumerically(">=", 1)) + + for _, msg := range messages { + if msg.MillisElapsedFromDelivery > 0 { + Expect(msg.MillisElapsedFromDelivery).To(BeNumerically(">=", 50)) + Expect(msg.DeliveredCount).To(BeNumerically(">=", 1)) + } + } + }) + + It("should XReadGroup with CLAIM and COUNT", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer3", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + Count: 2, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 && len(res[0].Messages) > 0 { + Expect(len(res[0].Messages)).To(BeNumerically("<=", 2)) + } + }) + + It("should XReadGroup with CLAIM and NOACK", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer4", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + NoAck: true, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 { + Expect(res[0].Stream).To(Equal("stream")) + } + }) + + It("should XReadGroup CLAIM empties PEL after acknowledgment", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer5", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 && len(res[0].Messages) > 0 { + ids := make([]string, len(res[0].Messages)) + for i, msg := range res[0].Messages { + ids[i] = msg.ID + } + + n, err := client.XAck(ctx, "stream", "group", ids...).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(BeNumerically(">=", 1)) + + pending, err := client.XPending(ctx, "stream", "group").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(pending.Count).To(BeNumerically("<", 3)) + } + }) + + It("should XReadGroup backward compatibility without CLAIM", func() { + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer_compat", + Streams: []string{"stream", "0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(1)) + Expect(res[0].Stream).To(Equal("stream")) + + for _, msg := range res[0].Messages { + Expect(msg.MillisElapsedFromDelivery).To(Equal(int64(0))) + Expect(msg.DeliveredCount).To(Equal(int64(0))) + } + }) + + It("should XReadGroup CLAIM with multiple streams", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + id, err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "stream2", + ID: "1-0", + Values: map[string]interface{}{"field1": "value1"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("1-0")) + + id, err = client.XAdd(ctx, &redis.XAddArgs{ + Stream: "stream2", + ID: "2-0", + Values: map[string]interface{}{"field2": "value2"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("2-0")) + + err = client.XGroupCreate(ctx, "stream2", "group2", "0").Err() + Expect(err).NotTo(HaveOccurred()) + + _, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group2", + Consumer: "consumer1", + Streams: []string{"stream2", ">"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group2", + Consumer: "consumer2", + Streams: []string{"stream2", ">"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 { + Expect(res[0].Stream).To(Equal("stream2")) + if len(res[0].Messages) > 0 { + for _, msg := range res[0].Messages { + if msg.MillisElapsedFromDelivery > 0 { + Expect(msg.DeliveredCount).To(BeNumerically(">=", 1)) + } + } + } + } + }) + + It("should XReadGroup CLAIM work consistently on RESP2 and RESP3", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + streamName := "stream-resp-test" + err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + Values: map[string]interface{}{"field1": "value1"}, + }).Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + Values: map[string]interface{}{"field2": "value2"}, + }).Err() + Expect(err).NotTo(HaveOccurred()) + + groupName := "resp-test-group" + err = client.XGroupCreate(ctx, streamName, groupName, "0").Err() + Expect(err).NotTo(HaveOccurred()) + + _, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "consumer1", + Streams: []string{streamName, ">"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(100 * time.Millisecond) + + // Test with RESP2 (protocol 2) + resp2Client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + Protocol: 2, + }) + defer resp2Client.Close() + + resp2Result, err := resp2Client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "consumer2", + Streams: []string{streamName, "0"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resp2Result).To(HaveLen(1)) + + // Test with RESP3 (protocol 3) + resp3Client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + Protocol: 3, + }) + defer resp3Client.Close() + + resp3Result, err := resp3Client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "consumer3", + Streams: []string{streamName, "0"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resp3Result).To(HaveLen(1)) + + Expect(len(resp2Result[0].Messages)).To(Equal(len(resp3Result[0].Messages))) + + for i := range resp2Result[0].Messages { + msg2 := resp2Result[0].Messages[i] + msg3 := resp3Result[0].Messages[i] + + Expect(msg2.ID).To(Equal(msg3.ID)) + + if msg2.MillisElapsedFromDelivery > 0 { + Expect(msg3.MillisElapsedFromDelivery).To(BeNumerically(">", 0)) + Expect(msg2.DeliveredCount).To(Equal(msg3.DeliveredCount)) + } + } + }) }) Describe("xinfo", func() { diff --git a/docker-compose.yml b/docker-compose.yml index 384d0fb2..9412549a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ services: redis: - image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre} + image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre.2} platform: linux/amd64 container_name: redis-standalone environment: @@ -23,7 +23,7 @@ services: - all osscluster: - image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre} + image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre.2} platform: linux/amd64 container_name: redis-osscluster environment: @@ -40,7 +40,7 @@ services: - all sentinel-cluster: - image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre} + image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre.2} platform: linux/amd64 container_name: redis-sentinel-cluster network_mode: "host" @@ -60,7 +60,7 @@ services: - all sentinel: - image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre} + image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre.2} platform: linux/amd64 container_name: redis-sentinel depends_on: @@ -84,7 +84,7 @@ services: - all ring-cluster: - image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre} + image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre.2} platform: linux/amd64 container_name: redis-ring-cluster environment: diff --git a/doctests/stream_tutorial_test.go b/doctests/stream_tutorial_test.go index e39919ea..655021fa 100644 --- a/doctests/stream_tutorial_test.go +++ b/doctests/stream_tutorial_test.go @@ -216,8 +216,8 @@ func ExampleClient_racefrance1() { // REMOVE_END // Output: - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}] - // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]}]}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}] + // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0}]}] // 4 } @@ -467,13 +467,13 @@ func ExampleClient_racefrance2() { // STEP_END // Output: - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}] - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]}] - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}] - // [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}] + // [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}] // [] - // [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}] - // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]}] + // [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}] + // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]}] } func ExampleClient_xgroupcreate() { @@ -999,18 +999,18 @@ func ExampleClient_raceitaly() { // REMOVE_END // Output: - // [{race:italy [{1692632639151-0 map[rider:Castilla]}]}] + // [{race:italy [{1692632639151-0 map[rider:Castilla] 0 0}]}] // 1 // [{race:italy []}] - // [{race:italy [{1692632647899-0 map[rider:Royce]} {1692632662819-0 map[rider:Sam-Bodden]}]}] + // [{race:italy [{1692632647899-0 map[rider:Royce] 0 0} {1692632662819-0 map[rider:Sam-Bodden] 0 0}]}] // &{2 1692632647899-0 1692632662819-0 map[Bob:2]} - // [{1692632647899-0 map[rider:Royce]}] - // [{1692632647899-0 map[rider:Royce]}] - // [{1692632647899-0 map[rider:Royce]}] + // [{1692632647899-0 map[rider:Royce] 0 0}] + // [{1692632647899-0 map[rider:Royce] 0 0}] + // [{1692632647899-0 map[rider:Royce] 0 0}] // 1692632662819-0 // [] // 0-0 - // &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla]} {1692632678249-0 map[rider:Norem]} 1692632639151-0} + // &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla] 0 0} {1692632678249-0 map[rider:Norem] 0 0} 1692632639151-0} // [{italy_riders 3 2 1692632662819-0 3 2}] // 2 // 0 @@ -1085,7 +1085,7 @@ func ExampleClient_xdel() { // STEP_END // Output: - // [{1692633198206-0 map[rider:Wood]} {1692633208557-0 map[rider:Henshaw]}] + // [{1692633198206-0 map[rider:Wood] 0 0} {1692633208557-0 map[rider:Henshaw] 0 0}] // 1 - // [{1692633198206-0 map[rider:Wood]}] + // [{1692633198206-0 map[rider:Wood] 0 0}] } diff --git a/export_test.go b/export_test.go index c1b77683..97b6179a 100644 --- a/export_test.go +++ b/export_test.go @@ -106,3 +106,7 @@ func (c *ModuleLoadexConfig) ToArgs() []interface{} { func ShouldRetry(err error, retryTimeout bool) bool { return shouldRetry(err, retryTimeout) } + +func JoinErrors(errs []error) string { + return joinErrors(errs) +} diff --git a/extra/rediscensus/go.mod b/extra/rediscensus/go.mod index d4272c97..21271aed 100644 --- a/extra/rediscensus/go.mod +++ b/extra/rediscensus/go.mod @@ -22,4 +22,3 @@ retract ( v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead. v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead. ) - diff --git a/extra/rediscmd/go.mod b/extra/rediscmd/go.mod index d8c03b6a..56d64080 100644 --- a/extra/rediscmd/go.mod +++ b/extra/rediscmd/go.mod @@ -19,4 +19,3 @@ retract ( v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead. v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead. ) - diff --git a/extra/redisotel/go.mod b/extra/redisotel/go.mod index 23cec11a..ade870c5 100644 --- a/extra/redisotel/go.mod +++ b/extra/redisotel/go.mod @@ -27,4 +27,3 @@ retract ( v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead. v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead. ) - diff --git a/extra/redisprometheus/go.mod b/extra/redisprometheus/go.mod index fd4e2d93..d704f9a7 100644 --- a/extra/redisprometheus/go.mod +++ b/extra/redisprometheus/go.mod @@ -26,4 +26,3 @@ retract ( v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead. v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead. ) - diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 71e71a8d..32ac88c2 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -735,7 +735,7 @@ func (cn *Conn) GetStateMachine() *ConnStateMachine { func (cn *Conn) TryAcquire() bool { // The || operator short-circuits, so only 1 CAS in the common case return cn.stateMachine.state.CompareAndSwap(uint32(StateIdle), uint32(StateInUse)) || - cn.stateMachine.state.Load() == uint32(StateCreated) + cn.stateMachine.state.CompareAndSwap(uint32(StateCreated), uint32(StateCreated)) } // Release releases the connection back to the pool. diff --git a/internal/pool/conn_check.go b/internal/pool/conn_check.go index cfdf5e5d..9e83dd83 100644 --- a/internal/pool/conn_check.go +++ b/internal/pool/conn_check.go @@ -30,7 +30,7 @@ func connCheck(conn net.Conn) error { var sysErr error - if err := rawConn.Control(func(fd uintptr) { + if err := rawConn.Read(func(fd uintptr) bool { var buf [1]byte // Use MSG_PEEK to peek at data without consuming it n, _, err := syscall.Recvfrom(int(fd), buf[:], syscall.MSG_PEEK|syscall.MSG_DONTWAIT) @@ -45,6 +45,7 @@ func connCheck(conn net.Conn) error { default: sysErr = err } + return true }); err != nil { return err } diff --git a/internal/pool/conn_used_at_test.go b/internal/pool/conn_used_at_test.go index d6dd27a0..97194505 100644 --- a/internal/pool/conn_used_at_test.go +++ b/internal/pool/conn_used_at_test.go @@ -22,7 +22,7 @@ func TestConn_UsedAtUpdatedOnRead(t *testing.T) { // Get initial usedAt time initialUsedAt := cn.UsedAt() - // Wait at least 50ms to ensure time difference (usedAt has ~50ms precision from cached time) + // Wait 100ms to ensure time difference (usedAt has ~50ms precision from cached time) time.Sleep(100 * time.Millisecond) // Simulate a read operation by calling WithReader @@ -45,10 +45,10 @@ func TestConn_UsedAtUpdatedOnRead(t *testing.T) { initialUsedAt, updatedUsedAt) } - // Verify the difference is reasonable (should be around 50ms, accounting for ~50ms cache precision) + // Verify the difference is reasonable (should be around 100ms, accounting for ~50ms cache precision and ~5ms sleep precision) diff := updatedUsedAt.Sub(initialUsedAt) - if diff < 50*time.Millisecond || diff > 200*time.Millisecond { - t.Errorf("Expected usedAt difference to be around 50ms (±50ms for cache), got %v", diff) + if diff < 45*time.Millisecond || diff > 155*time.Millisecond { + t.Errorf("Expected usedAt difference to be around 100ms (±50ms for cache, ±5ms for sleep), got %v", diff) } } diff --git a/internal/pool/hooks.go b/internal/pool/hooks.go index 1c365dba..a26e1976 100644 --- a/internal/pool/hooks.go +++ b/internal/pool/hooks.go @@ -71,10 +71,13 @@ func (phm *PoolHookManager) RemoveHook(hook PoolHook) { // ProcessOnGet calls all OnGet hooks in order. // If any hook returns an error, processing stops and the error is returned. func (phm *PoolHookManager) ProcessOnGet(ctx context.Context, conn *Conn, isNewConn bool) (acceptConn bool, err error) { + // Copy slice reference while holding lock (fast) phm.hooksMu.RLock() - defer phm.hooksMu.RUnlock() + hooks := phm.hooks + phm.hooksMu.RUnlock() - for _, hook := range phm.hooks { + // Call hooks without holding lock (slow operations) + for _, hook := range hooks { acceptConn, err := hook.OnGet(ctx, conn, isNewConn) if err != nil { return false, err @@ -90,12 +93,15 @@ func (phm *PoolHookManager) ProcessOnGet(ctx context.Context, conn *Conn, isNewC // ProcessOnPut calls all OnPut hooks in order. // The first hook that returns shouldRemove=true or shouldPool=false will stop processing. func (phm *PoolHookManager) ProcessOnPut(ctx context.Context, conn *Conn) (shouldPool bool, shouldRemove bool, err error) { + // Copy slice reference while holding lock (fast) phm.hooksMu.RLock() - defer phm.hooksMu.RUnlock() + hooks := phm.hooks + phm.hooksMu.RUnlock() shouldPool = true // Default to pooling the connection - for _, hook := range phm.hooks { + // Call hooks without holding lock (slow operations) + for _, hook := range hooks { hookShouldPool, hookShouldRemove, hookErr := hook.OnPut(ctx, conn) if hookErr != nil { @@ -117,9 +123,13 @@ func (phm *PoolHookManager) ProcessOnPut(ctx context.Context, conn *Conn) (shoul // ProcessOnRemove calls all OnRemove hooks in order. func (phm *PoolHookManager) ProcessOnRemove(ctx context.Context, conn *Conn, reason error) { + // Copy slice reference while holding lock (fast) phm.hooksMu.RLock() - defer phm.hooksMu.RUnlock() - for _, hook := range phm.hooks { + hooks := phm.hooks + phm.hooksMu.RUnlock() + + // Call hooks without holding lock (slow operations) + for _, hook := range hooks { hook.OnRemove(ctx, conn, reason) } } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 597a969d..a6c964c0 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -160,18 +160,9 @@ type ConnPool struct { var _ Pooler = (*ConnPool)(nil) func NewConnPool(opt *Options) *ConnPool { - semSize := opt.PoolSize - if opt.MaxActiveConns > 0 && opt.MaxActiveConns < opt.PoolSize { - if opt.MaxActiveConns < opt.PoolSize { - opt.MaxActiveConns = opt.PoolSize - } - semSize = opt.MaxActiveConns - } - //semSize = opt.PoolSize - p := &ConnPool{ cfg: opt, - semaphore: internal.NewFastSemaphore(semSize), + semaphore: internal.NewFastSemaphore(opt.PoolSize), queue: make(chan struct{}, opt.PoolSize), conns: make(map[uint64]*Conn), dialsInProgress: make(chan struct{}, opt.MaxConcurrentDials), @@ -470,17 +461,11 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { // Use cached time for health checks (max 50ms staleness is acceptable) nowNs := getCachedTimeNs() - attempts := 0 // Lock-free atomic read - no mutex overhead! hookManager := p.hookManager.Load() - for { - if attempts >= getAttempts { - internal.Logger.Printf(ctx, "redis: connection pool: was not able to get a healthy connection after %d attempts", attempts) - break - } - attempts++ + for attempts := 0; attempts < getAttempts; attempts++ { p.connsMu.Lock() cn, err = p.popIdle() diff --git a/redis.go b/redis.go index 0879f08e..362e2af2 100644 --- a/redis.go +++ b/redis.go @@ -553,7 +553,9 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { cn.GetStateMachine().Transition(pool.StateClosed) return fmt.Errorf("failed to enable maintnotifications: %w", maintNotifHandshakeErr) default: // will handle auto and any other - internal.Logger.Printf(ctx, "auto mode fallback: maintnotifications disabled due to handshake error: %v", maintNotifHandshakeErr) + // Disabling logging here as it's too noisy. + // TODO: Enable when we have a better logging solution for log levels + // internal.Logger.Printf(ctx, "auto mode fallback: maintnotifications disabled due to handshake error: %v", maintNotifHandshakeErr) c.opt.MaintNotificationsConfig.Mode = maintnotifications.ModeDisabled c.optLock.Unlock() // auto mode, disable maintnotifications and continue diff --git a/sentinel.go b/sentinel.go index 4e2b5e71..ef9e8267 100644 --- a/sentinel.go +++ b/sentinel.go @@ -851,6 +851,11 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) { } } + // short circuit if no sentinels configured + if len(c.sentinelAddrs) == 0 { + return "", errors.New("redis: no sentinels configured") + } + var ( masterAddr string wg sync.WaitGroup @@ -898,10 +903,12 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) { } func joinErrors(errs []error) string { + if len(errs) == 0 { + return "" + } if len(errs) == 1 { return errs[0].Error() } - b := []byte(errs[0].Error()) for _, err := range errs[1:] { b = append(b, '\n') diff --git a/sentinel_test.go b/sentinel_test.go index f332822f..0f0f61eb 100644 --- a/sentinel_test.go +++ b/sentinel_test.go @@ -682,3 +682,99 @@ func compareSlices(t *testing.T, a, b []string, name string) { } } } + +type joinErrorsTest struct { + name string + errs []error + expected string +} + +func TestJoinErrors(t *testing.T) { + tests := []joinErrorsTest{ + { + name: "empty slice", + errs: []error{}, + expected: "", + }, + { + name: "single error", + errs: []error{errors.New("first error")}, + expected: "first error", + }, + { + name: "two errors", + errs: []error{errors.New("first error"), errors.New("second error")}, + expected: "first error\nsecond error", + }, + { + name: "multiple errors", + errs: []error{ + errors.New("first error"), + errors.New("second error"), + errors.New("third error"), + }, + expected: "first error\nsecond error\nthird error", + }, + { + name: "nil slice", + errs: nil, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := redis.JoinErrors(tt.errs) + if result != tt.expected { + t.Errorf("joinErrors() = %q, want %q", result, tt.expected) + } + }) + } +} + +func BenchmarkJoinErrors(b *testing.B) { + benchmarks := []joinErrorsTest{ + { + name: "empty slice", + errs: []error{}, + expected: "", + }, + { + name: "single error", + errs: []error{errors.New("first error")}, + expected: "first error", + }, + { + name: "two errors", + errs: []error{errors.New("first error"), errors.New("second error")}, + expected: "first error\nsecond error", + }, + { + name: "multiple errors", + errs: []error{ + errors.New("first error"), + errors.New("second error"), + errors.New("third error"), + }, + expected: "first error\nsecond error\nthird error", + }, + { + name: "nil slice", + errs: nil, + expected: "", + }, + } + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + result := redis.JoinErrors(bm.errs) + if result != bm.expected { + b.Errorf("joinErrors() = %q, want %q", result, bm.expected) + } + } + }) + }) + } +} diff --git a/stream_commands.go b/stream_commands.go index 4b84e00f..5573e48b 100644 --- a/stream_commands.go +++ b/stream_commands.go @@ -263,6 +263,7 @@ type XReadGroupArgs struct { Count int64 Block time.Duration NoAck bool + Claim time.Duration // Claim idle pending entries older than this duration } func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd { @@ -282,6 +283,10 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic args = append(args, "noack") keyPos++ } + if a.Claim > 0 { + args = append(args, "claim", int64(a.Claim/time.Millisecond)) + keyPos += 2 + } args = append(args, "streams") keyPos++ for _, s := range a.Streams { diff --git a/string_commands.go b/string_commands.go index d6bf5cec..89c3bec4 100644 --- a/string_commands.go +++ b/string_commands.go @@ -21,6 +21,7 @@ type StringCmdable interface { MGet(ctx context.Context, keys ...string) *SliceCmd MSet(ctx context.Context, values ...interface{}) *StatusCmd MSetNX(ctx context.Context, values ...interface{}) *BoolCmd + MSetEX(ctx context.Context, args MSetEXArgs, values ...interface{}) *IntCmd Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd SetArgs(ctx context.Context, key string, value interface{}, a SetArgs) *StatusCmd SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd @@ -112,6 +113,35 @@ func (c cmdable) IncrByFloat(ctx context.Context, key string, value float64) *Fl return cmd } +type SetCondition string + +const ( + // NX only set the keys and their expiration if none exist + NX SetCondition = "NX" + // XX only set the keys and their expiration if all already exist + XX SetCondition = "XX" +) + +type ExpirationMode string + +const ( + // EX sets expiration in seconds + EX ExpirationMode = "EX" + // PX sets expiration in milliseconds + PX ExpirationMode = "PX" + // EXAT sets expiration as Unix timestamp in seconds + EXAT ExpirationMode = "EXAT" + // PXAT sets expiration as Unix timestamp in milliseconds + PXAT ExpirationMode = "PXAT" + // KEEPTTL keeps the existing TTL + KEEPTTL ExpirationMode = "KEEPTTL" +) + +type ExpirationOption struct { + Mode ExpirationMode + Value int64 +} + func (c cmdable) LCS(ctx context.Context, q *LCSQuery) *LCSCmd { cmd := NewLCSCmd(ctx, q) _ = c(ctx, cmd) @@ -157,6 +187,49 @@ func (c cmdable) MSetNX(ctx context.Context, values ...interface{}) *BoolCmd { return cmd } +type MSetEXArgs struct { + Condition SetCondition + Expiration *ExpirationOption +} + +// MSetEX sets the given keys to their respective values. +// This command is an extension of the MSETNX that adds expiration and XX options. +// Available since Redis 8.4 +// Important: When this method is used with Cluster clients, all keys +// must be in the same hash slot, otherwise CROSSSLOT error will be returned. +// For more information, see https://redis.io/commands/msetex +func (c cmdable) MSetEX(ctx context.Context, args MSetEXArgs, values ...interface{}) *IntCmd { + expandedArgs := appendArgs([]interface{}{}, values) + numkeys := len(expandedArgs) / 2 + + cmdArgs := make([]interface{}, 0, 2+len(expandedArgs)+3) + cmdArgs = append(cmdArgs, "msetex", numkeys) + cmdArgs = append(cmdArgs, expandedArgs...) + + if args.Condition != "" { + cmdArgs = append(cmdArgs, string(args.Condition)) + } + + if args.Expiration != nil { + switch args.Expiration.Mode { + case EX: + cmdArgs = append(cmdArgs, "ex", args.Expiration.Value) + case PX: + cmdArgs = append(cmdArgs, "px", args.Expiration.Value) + case EXAT: + cmdArgs = append(cmdArgs, "exat", args.Expiration.Value) + case PXAT: + cmdArgs = append(cmdArgs, "pxat", args.Expiration.Value) + case KEEPTTL: + cmdArgs = append(cmdArgs, "keepttl") + } + } + + cmd := NewIntCmd(ctx, cmdArgs...) + _ = c(ctx, cmd) + return cmd +} + // Set Redis `SET key value [expiration]` command. // Use expiration for `SETEx`-like behavior. //