1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-02 06:22:31 +03:00

Merge branch 'ndyakov/state-machine-conn' into playground/autopipeline

This commit is contained in:
Nedyalko Dyakov
2025-11-05 22:34:55 +02:00
24 changed files with 694 additions and 62 deletions

View File

@@ -24,7 +24,7 @@ runs:
# Mapping of redis version to redis testing containers
declare -A redis_version_mapping=(
["8.4.x"]="8.4-RC1-pre"
["8.4.x"]="8.4-RC1-pre.2"
["8.2.x"]="8.2.1-pre"
["8.0.x"]="8.0.2"
)

View File

@@ -44,7 +44,7 @@ jobs:
# Mapping of redis version to redis testing containers
declare -A redis_version_mapping=(
["8.4.x"]="8.4-RC1-pre"
["8.4.x"]="8.4-RC1-pre.2"
["8.2.x"]="8.2.1-pre"
["8.0.x"]="8.0.2"
)

View File

@@ -2,7 +2,7 @@ GO_MOD_DIRS := $(shell find . -type f -name 'go.mod' -exec dirname {} \; | sort)
REDIS_VERSION ?= 8.4
RE_CLUSTER ?= false
RCE_DOCKER ?= true
CLIENT_LIBS_TEST_IMAGE ?= redislabs/client-libs-test:8.4-RC1-pre
CLIENT_LIBS_TEST_IMAGE ?= redislabs/client-libs-test:8.4-RC1-pre.2
docker.start:
export RE_CLUSTER=$(RE_CLUSTER) && \

View File

@@ -8,8 +8,12 @@ type ACLCmdable interface {
ACLLog(ctx context.Context, count int64) *ACLLogCmd
ACLLogReset(ctx context.Context) *StatusCmd
ACLGenPass(ctx context.Context, bit int) *StringCmd
ACLSetUser(ctx context.Context, username string, rules ...string) *StatusCmd
ACLDelUser(ctx context.Context, username string) *IntCmd
ACLUsers(ctx context.Context) *StringSliceCmd
ACLWhoAmI(ctx context.Context) *StringCmd
ACLList(ctx context.Context) *StringSliceCmd
ACLCat(ctx context.Context) *StringSliceCmd
@@ -65,6 +69,24 @@ func (c cmdable) ACLSetUser(ctx context.Context, username string, rules ...strin
return cmd
}
func (c cmdable) ACLGenPass(ctx context.Context, bit int) *StringCmd {
cmd := NewStringCmd(ctx, "acl", "genpass")
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ACLUsers(ctx context.Context) *StringSliceCmd {
cmd := NewStringSliceCmd(ctx, "acl", "users")
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ACLWhoAmI(ctx context.Context) *StringCmd {
cmd := NewStringCmd(ctx, "acl", "whoami")
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ACLList(ctx context.Context) *StringSliceCmd {
cmd := NewStringSliceCmd(ctx, "acl", "list")
_ = c(ctx, cmd)

View File

@@ -92,6 +92,21 @@ var _ = Describe("ACL user commands", Label("NonRedisEnterprise"), func() {
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(1))
Expect(res[0]).To(ContainSubstring("default"))
res, err = client.ACLUsers(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(1))
Expect(res[0]).To(Equal("default"))
res1, err := client.ACLWhoAmI(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res1).To(Equal("default"))
})
It("gen password", func() {
password, err := client.ACLGenPass(ctx, 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(password).NotTo(BeEmpty())
})
It("setuser and deluser", func() {

View File

@@ -1607,6 +1607,12 @@ func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error {
type XMessage struct {
ID string
Values map[string]interface{}
// MillisElapsedFromDelivery is the number of milliseconds since the entry was last delivered.
// Only populated when using XREADGROUP with CLAIM argument for claimed entries.
MillisElapsedFromDelivery int64
// DeliveredCount is the number of times the entry was delivered.
// Only populated when using XREADGROUP with CLAIM argument for claimed entries.
DeliveredCount int64
}
type XMessageSliceCmd struct {
@@ -1663,10 +1669,16 @@ func readXMessageSlice(rd *proto.Reader) ([]XMessage, error) {
}
func readXMessage(rd *proto.Reader) (XMessage, error) {
if err := rd.ReadFixedArrayLen(2); err != nil {
// Read array length can be 2 or 4 (with CLAIM metadata)
n, err := rd.ReadArrayLen()
if err != nil {
return XMessage{}, err
}
if n != 2 && n != 4 {
return XMessage{}, fmt.Errorf("redis: got %d elements in the XMessage array, expected 2 or 4", n)
}
id, err := rd.ReadString()
if err != nil {
return XMessage{}, err
@@ -1679,10 +1691,24 @@ func readXMessage(rd *proto.Reader) (XMessage, error) {
}
}
return XMessage{
msg := XMessage{
ID: id,
Values: v,
}, nil
}
if n == 4 {
msg.MillisElapsedFromDelivery, err = rd.ReadInt()
if err != nil {
return XMessage{}, err
}
msg.DeliveredCount, err = rd.ReadInt()
if err != nil {
return XMessage{}, err
}
}
return msg, nil
}
func stringInterfaceMapParser(rd *proto.Reader) (map[string]interface{}, error) {

View File

@@ -199,6 +199,29 @@ var _ = Describe("Commands", func() {
Expect(r.Val()).To(Equal(int64(0)))
})
It("should ClientKillByFilter with kill myself", func() {
opt := redisOptions()
opt.ClientName = "killmyid"
db := redis.NewClient(opt)
Expect(db.Ping(ctx).Err()).NotTo(HaveOccurred())
defer func() {
Expect(db.Close()).NotTo(HaveOccurred())
}()
val, err := client.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=killmyid"))
myid := db.ClientID(ctx).Val()
killed := client.ClientKillByFilter(ctx, "ID", strconv.FormatInt(myid, 10))
Expect(killed.Err()).NotTo(HaveOccurred())
Expect(killed.Val()).To(BeNumerically("==", 1))
val, err = client.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).ShouldNot(ContainSubstring("name=killmyid"))
})
It("should ClientKillByFilter with MAXAGE", Label("NonRedisEnterprise"), func() {
SkipBeforeRedisVersion(7.4, "doesn't work with older redis stack images")
var s []string
@@ -1912,6 +1935,137 @@ var _ = Describe("Commands", func() {
Expect(mSetNX.Val()).To(Equal(true))
})
It("should MSetEX", func() {
SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4")
args := redis.MSetEXArgs{
Expiration: &redis.ExpirationOption{
Mode: redis.EX,
Value: 1,
},
}
mSetEX := client.MSetEX(ctx, args, "key1", "hello1", "key2", "hello2")
Expect(mSetEX.Err()).NotTo(HaveOccurred())
Expect(mSetEX.Val()).To(Equal(int64(1)))
// Verify keys were set
val1 := client.Get(ctx, "key1")
Expect(val1.Err()).NotTo(HaveOccurred())
Expect(val1.Val()).To(Equal("hello1"))
val2 := client.Get(ctx, "key2")
Expect(val2.Err()).NotTo(HaveOccurred())
Expect(val2.Val()).To(Equal("hello2"))
// Verify TTL was set
ttl1 := client.TTL(ctx, "key1")
Expect(ttl1.Err()).NotTo(HaveOccurred())
Expect(ttl1.Val()).To(BeNumerically(">", 0))
Expect(ttl1.Val()).To(BeNumerically("<=", 1*time.Second))
ttl2 := client.TTL(ctx, "key2")
Expect(ttl2.Err()).NotTo(HaveOccurred())
Expect(ttl2.Val()).To(BeNumerically(">", 0))
Expect(ttl2.Val()).To(BeNumerically("<=", 1*time.Second))
})
It("should MSetEX with NX mode", func() {
SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4")
client.Set(ctx, "key1", "existing", 0)
// Try to set with NX mode - should fail because key1 exists
args := redis.MSetEXArgs{
Condition: redis.NX,
Expiration: &redis.ExpirationOption{
Mode: redis.EX,
Value: 1,
},
}
mSetEX := client.MSetEX(ctx, args, "key1", "new1", "key2", "new2")
Expect(mSetEX.Err()).NotTo(HaveOccurred())
Expect(mSetEX.Val()).To(Equal(int64(0)))
val1 := client.Get(ctx, "key1")
Expect(val1.Err()).NotTo(HaveOccurred())
Expect(val1.Val()).To(Equal("existing"))
val2 := client.Get(ctx, "key2")
Expect(val2.Err()).To(Equal(redis.Nil))
client.Del(ctx, "key1")
// Now try with NX mode when keys don't exist - should succeed
mSetEX = client.MSetEX(ctx, args, "key1", "new1", "key2", "new2")
Expect(mSetEX.Err()).NotTo(HaveOccurred())
Expect(mSetEX.Val()).To(Equal(int64(1)))
val1 = client.Get(ctx, "key1")
Expect(val1.Err()).NotTo(HaveOccurred())
Expect(val1.Val()).To(Equal("new1"))
val2 = client.Get(ctx, "key2")
Expect(val2.Err()).NotTo(HaveOccurred())
Expect(val2.Val()).To(Equal("new2"))
})
It("should MSetEX with XX mode", func() {
SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4")
args := redis.MSetEXArgs{
Condition: redis.XX,
Expiration: &redis.ExpirationOption{
Mode: redis.EX,
Value: 1,
},
}
mSetEX := client.MSetEX(ctx, args, "key1", "new1", "key2", "new2")
Expect(mSetEX.Err()).NotTo(HaveOccurred())
Expect(mSetEX.Val()).To(Equal(int64(0)))
client.Set(ctx, "key1", "existing1", 0)
client.Set(ctx, "key2", "existing2", 0)
mSetEX = client.MSetEX(ctx, args, "key1", "new1", "key2", "new2")
Expect(mSetEX.Err()).NotTo(HaveOccurred())
Expect(mSetEX.Val()).To(Equal(int64(1)))
val1 := client.Get(ctx, "key1")
Expect(val1.Err()).NotTo(HaveOccurred())
Expect(val1.Val()).To(Equal("new1"))
val2 := client.Get(ctx, "key2")
Expect(val2.Err()).NotTo(HaveOccurred())
Expect(val2.Val()).To(Equal("new2"))
ttl1 := client.TTL(ctx, "key1")
Expect(ttl1.Err()).NotTo(HaveOccurred())
Expect(ttl1.Val()).To(BeNumerically(">", 0))
})
It("should MSetEX with map", func() {
SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4")
args := redis.MSetEXArgs{
Expiration: &redis.ExpirationOption{
Mode: redis.EX,
Value: 1,
},
}
mSetEX := client.MSetEX(ctx, args, map[string]interface{}{
"key1": "value1",
"key2": "value2",
})
Expect(mSetEX.Err()).NotTo(HaveOccurred())
Expect(mSetEX.Val()).To(Equal(int64(1)))
val1 := client.Get(ctx, "key1")
Expect(val1.Err()).NotTo(HaveOccurred())
Expect(val1.Val()).To(Equal("value1"))
val2 := client.Get(ctx, "key2")
Expect(val2.Err()).NotTo(HaveOccurred())
Expect(val2.Val()).To(Equal("value2"))
})
It("should SetWithArgs with TTL", func() {
args := redis.SetArgs{
TTL: 500 * time.Millisecond,
@@ -6749,6 +6903,242 @@ var _ = Describe("Commands", func() {
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(2)))
})
It("should XReadGroup with CLAIM argument", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
time.Sleep(100 * time.Millisecond)
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer2",
Streams: []string{"stream", ">"},
Claim: 50 * time.Millisecond,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(1))
Expect(res[0].Stream).To(Equal("stream"))
messages := res[0].Messages
Expect(len(messages)).To(BeNumerically(">=", 1))
for _, msg := range messages {
if msg.MillisElapsedFromDelivery > 0 {
Expect(msg.MillisElapsedFromDelivery).To(BeNumerically(">=", 50))
Expect(msg.DeliveredCount).To(BeNumerically(">=", 1))
}
}
})
It("should XReadGroup with CLAIM and COUNT", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
time.Sleep(100 * time.Millisecond)
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer3",
Streams: []string{"stream", ">"},
Claim: 50 * time.Millisecond,
Count: 2,
}).Result()
Expect(err).NotTo(HaveOccurred())
if len(res) > 0 && len(res[0].Messages) > 0 {
Expect(len(res[0].Messages)).To(BeNumerically("<=", 2))
}
})
It("should XReadGroup with CLAIM and NOACK", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
time.Sleep(100 * time.Millisecond)
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer4",
Streams: []string{"stream", ">"},
Claim: 50 * time.Millisecond,
NoAck: true,
}).Result()
Expect(err).NotTo(HaveOccurred())
if len(res) > 0 {
Expect(res[0].Stream).To(Equal("stream"))
}
})
It("should XReadGroup CLAIM empties PEL after acknowledgment", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
time.Sleep(100 * time.Millisecond)
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer5",
Streams: []string{"stream", ">"},
Claim: 50 * time.Millisecond,
}).Result()
Expect(err).NotTo(HaveOccurred())
if len(res) > 0 && len(res[0].Messages) > 0 {
ids := make([]string, len(res[0].Messages))
for i, msg := range res[0].Messages {
ids[i] = msg.ID
}
n, err := client.XAck(ctx, "stream", "group", ids...).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(BeNumerically(">=", 1))
pending, err := client.XPending(ctx, "stream", "group").Result()
Expect(err).NotTo(HaveOccurred())
Expect(pending.Count).To(BeNumerically("<", 3))
}
})
It("should XReadGroup backward compatibility without CLAIM", func() {
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer_compat",
Streams: []string{"stream", "0"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(1))
Expect(res[0].Stream).To(Equal("stream"))
for _, msg := range res[0].Messages {
Expect(msg.MillisElapsedFromDelivery).To(Equal(int64(0)))
Expect(msg.DeliveredCount).To(Equal(int64(0)))
}
})
It("should XReadGroup CLAIM with multiple streams", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream2",
ID: "1-0",
Values: map[string]interface{}{"field1": "value1"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(id).To(Equal("1-0"))
id, err = client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream2",
ID: "2-0",
Values: map[string]interface{}{"field2": "value2"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(id).To(Equal("2-0"))
err = client.XGroupCreate(ctx, "stream2", "group2", "0").Err()
Expect(err).NotTo(HaveOccurred())
_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group2",
Consumer: "consumer1",
Streams: []string{"stream2", ">"},
}).Result()
Expect(err).NotTo(HaveOccurred())
time.Sleep(100 * time.Millisecond)
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group2",
Consumer: "consumer2",
Streams: []string{"stream2", ">"},
Claim: 50 * time.Millisecond,
}).Result()
Expect(err).NotTo(HaveOccurred())
if len(res) > 0 {
Expect(res[0].Stream).To(Equal("stream2"))
if len(res[0].Messages) > 0 {
for _, msg := range res[0].Messages {
if msg.MillisElapsedFromDelivery > 0 {
Expect(msg.DeliveredCount).To(BeNumerically(">=", 1))
}
}
}
}
})
It("should XReadGroup CLAIM work consistently on RESP2 and RESP3", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
streamName := "stream-resp-test"
err := client.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]interface{}{"field1": "value1"},
}).Err()
Expect(err).NotTo(HaveOccurred())
err = client.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]interface{}{"field2": "value2"},
}).Err()
Expect(err).NotTo(HaveOccurred())
groupName := "resp-test-group"
err = client.XGroupCreate(ctx, streamName, groupName, "0").Err()
Expect(err).NotTo(HaveOccurred())
_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer1",
Streams: []string{streamName, ">"},
}).Result()
Expect(err).NotTo(HaveOccurred())
time.Sleep(100 * time.Millisecond)
// Test with RESP2 (protocol 2)
resp2Client := redis.NewClient(&redis.Options{
Addr: redisAddr,
Protocol: 2,
})
defer resp2Client.Close()
resp2Result, err := resp2Client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer2",
Streams: []string{streamName, "0"},
Claim: 50 * time.Millisecond,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resp2Result).To(HaveLen(1))
// Test with RESP3 (protocol 3)
resp3Client := redis.NewClient(&redis.Options{
Addr: redisAddr,
Protocol: 3,
})
defer resp3Client.Close()
resp3Result, err := resp3Client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer3",
Streams: []string{streamName, "0"},
Claim: 50 * time.Millisecond,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resp3Result).To(HaveLen(1))
Expect(len(resp2Result[0].Messages)).To(Equal(len(resp3Result[0].Messages)))
for i := range resp2Result[0].Messages {
msg2 := resp2Result[0].Messages[i]
msg3 := resp3Result[0].Messages[i]
Expect(msg2.ID).To(Equal(msg3.ID))
if msg2.MillisElapsedFromDelivery > 0 {
Expect(msg3.MillisElapsedFromDelivery).To(BeNumerically(">", 0))
Expect(msg2.DeliveredCount).To(Equal(msg3.DeliveredCount))
}
}
})
})
Describe("xinfo", func() {

View File

@@ -2,7 +2,7 @@
services:
redis:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre}
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre.2}
platform: linux/amd64
container_name: redis-standalone
environment:
@@ -23,7 +23,7 @@ services:
- all
osscluster:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre}
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre.2}
platform: linux/amd64
container_name: redis-osscluster
environment:
@@ -40,7 +40,7 @@ services:
- all
sentinel-cluster:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre}
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre.2}
platform: linux/amd64
container_name: redis-sentinel-cluster
network_mode: "host"
@@ -60,7 +60,7 @@ services:
- all
sentinel:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre}
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre.2}
platform: linux/amd64
container_name: redis-sentinel
depends_on:
@@ -84,7 +84,7 @@ services:
- all
ring-cluster:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre}
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.4-RC1-pre.2}
platform: linux/amd64
container_name: redis-ring-cluster
environment:

View File

@@ -216,8 +216,8 @@ func ExampleClient_racefrance1() {
// REMOVE_END
// Output:
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]}]}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0}]}]
// 4
}
@@ -467,13 +467,13 @@ func ExampleClient_racefrance2() {
// STEP_END
// Output:
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]
// [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]
// [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}]
// []
// [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}]
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]}]
// [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}]
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]}]
}
func ExampleClient_xgroupcreate() {
@@ -999,18 +999,18 @@ func ExampleClient_raceitaly() {
// REMOVE_END
// Output:
// [{race:italy [{1692632639151-0 map[rider:Castilla]}]}]
// [{race:italy [{1692632639151-0 map[rider:Castilla] 0 0}]}]
// 1
// [{race:italy []}]
// [{race:italy [{1692632647899-0 map[rider:Royce]} {1692632662819-0 map[rider:Sam-Bodden]}]}]
// [{race:italy [{1692632647899-0 map[rider:Royce] 0 0} {1692632662819-0 map[rider:Sam-Bodden] 0 0}]}]
// &{2 1692632647899-0 1692632662819-0 map[Bob:2]}
// [{1692632647899-0 map[rider:Royce]}]
// [{1692632647899-0 map[rider:Royce]}]
// [{1692632647899-0 map[rider:Royce]}]
// [{1692632647899-0 map[rider:Royce] 0 0}]
// [{1692632647899-0 map[rider:Royce] 0 0}]
// [{1692632647899-0 map[rider:Royce] 0 0}]
// 1692632662819-0
// []
// 0-0
// &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla]} {1692632678249-0 map[rider:Norem]} 1692632639151-0}
// &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla] 0 0} {1692632678249-0 map[rider:Norem] 0 0} 1692632639151-0}
// [{italy_riders 3 2 1692632662819-0 3 2}]
// 2
// 0
@@ -1085,7 +1085,7 @@ func ExampleClient_xdel() {
// STEP_END
// Output:
// [{1692633198206-0 map[rider:Wood]} {1692633208557-0 map[rider:Henshaw]}]
// [{1692633198206-0 map[rider:Wood] 0 0} {1692633208557-0 map[rider:Henshaw] 0 0}]
// 1
// [{1692633198206-0 map[rider:Wood]}]
// [{1692633198206-0 map[rider:Wood] 0 0}]
}

View File

@@ -106,3 +106,7 @@ func (c *ModuleLoadexConfig) ToArgs() []interface{} {
func ShouldRetry(err error, retryTimeout bool) bool {
return shouldRetry(err, retryTimeout)
}
func JoinErrors(errs []error) string {
return joinErrors(errs)
}

View File

@@ -22,4 +22,3 @@ retract (
v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead.
v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead.
)

View File

@@ -19,4 +19,3 @@ retract (
v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead.
v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead.
)

View File

@@ -27,4 +27,3 @@ retract (
v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead.
v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead.
)

View File

@@ -26,4 +26,3 @@ retract (
v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead.
v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead.
)

View File

@@ -735,7 +735,7 @@ func (cn *Conn) GetStateMachine() *ConnStateMachine {
func (cn *Conn) TryAcquire() bool {
// The || operator short-circuits, so only 1 CAS in the common case
return cn.stateMachine.state.CompareAndSwap(uint32(StateIdle), uint32(StateInUse)) ||
cn.stateMachine.state.Load() == uint32(StateCreated)
cn.stateMachine.state.CompareAndSwap(uint32(StateCreated), uint32(StateCreated))
}
// Release releases the connection back to the pool.

View File

@@ -30,7 +30,7 @@ func connCheck(conn net.Conn) error {
var sysErr error
if err := rawConn.Control(func(fd uintptr) {
if err := rawConn.Read(func(fd uintptr) bool {
var buf [1]byte
// Use MSG_PEEK to peek at data without consuming it
n, _, err := syscall.Recvfrom(int(fd), buf[:], syscall.MSG_PEEK|syscall.MSG_DONTWAIT)
@@ -45,6 +45,7 @@ func connCheck(conn net.Conn) error {
default:
sysErr = err
}
return true
}); err != nil {
return err
}

View File

@@ -22,7 +22,7 @@ func TestConn_UsedAtUpdatedOnRead(t *testing.T) {
// Get initial usedAt time
initialUsedAt := cn.UsedAt()
// Wait at least 50ms to ensure time difference (usedAt has ~50ms precision from cached time)
// Wait 100ms to ensure time difference (usedAt has ~50ms precision from cached time)
time.Sleep(100 * time.Millisecond)
// Simulate a read operation by calling WithReader
@@ -45,10 +45,10 @@ func TestConn_UsedAtUpdatedOnRead(t *testing.T) {
initialUsedAt, updatedUsedAt)
}
// Verify the difference is reasonable (should be around 50ms, accounting for ~50ms cache precision)
// Verify the difference is reasonable (should be around 100ms, accounting for ~50ms cache precision and ~5ms sleep precision)
diff := updatedUsedAt.Sub(initialUsedAt)
if diff < 50*time.Millisecond || diff > 200*time.Millisecond {
t.Errorf("Expected usedAt difference to be around 50ms (±50ms for cache), got %v", diff)
if diff < 45*time.Millisecond || diff > 155*time.Millisecond {
t.Errorf("Expected usedAt difference to be around 100ms (±50ms for cache, ±5ms for sleep), got %v", diff)
}
}

View File

@@ -71,10 +71,13 @@ func (phm *PoolHookManager) RemoveHook(hook PoolHook) {
// ProcessOnGet calls all OnGet hooks in order.
// If any hook returns an error, processing stops and the error is returned.
func (phm *PoolHookManager) ProcessOnGet(ctx context.Context, conn *Conn, isNewConn bool) (acceptConn bool, err error) {
// Copy slice reference while holding lock (fast)
phm.hooksMu.RLock()
defer phm.hooksMu.RUnlock()
hooks := phm.hooks
phm.hooksMu.RUnlock()
for _, hook := range phm.hooks {
// Call hooks without holding lock (slow operations)
for _, hook := range hooks {
acceptConn, err := hook.OnGet(ctx, conn, isNewConn)
if err != nil {
return false, err
@@ -90,12 +93,15 @@ func (phm *PoolHookManager) ProcessOnGet(ctx context.Context, conn *Conn, isNewC
// ProcessOnPut calls all OnPut hooks in order.
// The first hook that returns shouldRemove=true or shouldPool=false will stop processing.
func (phm *PoolHookManager) ProcessOnPut(ctx context.Context, conn *Conn) (shouldPool bool, shouldRemove bool, err error) {
// Copy slice reference while holding lock (fast)
phm.hooksMu.RLock()
defer phm.hooksMu.RUnlock()
hooks := phm.hooks
phm.hooksMu.RUnlock()
shouldPool = true // Default to pooling the connection
for _, hook := range phm.hooks {
// Call hooks without holding lock (slow operations)
for _, hook := range hooks {
hookShouldPool, hookShouldRemove, hookErr := hook.OnPut(ctx, conn)
if hookErr != nil {
@@ -117,9 +123,13 @@ func (phm *PoolHookManager) ProcessOnPut(ctx context.Context, conn *Conn) (shoul
// ProcessOnRemove calls all OnRemove hooks in order.
func (phm *PoolHookManager) ProcessOnRemove(ctx context.Context, conn *Conn, reason error) {
// Copy slice reference while holding lock (fast)
phm.hooksMu.RLock()
defer phm.hooksMu.RUnlock()
for _, hook := range phm.hooks {
hooks := phm.hooks
phm.hooksMu.RUnlock()
// Call hooks without holding lock (slow operations)
for _, hook := range hooks {
hook.OnRemove(ctx, conn, reason)
}
}

View File

@@ -160,18 +160,9 @@ type ConnPool struct {
var _ Pooler = (*ConnPool)(nil)
func NewConnPool(opt *Options) *ConnPool {
semSize := opt.PoolSize
if opt.MaxActiveConns > 0 && opt.MaxActiveConns < opt.PoolSize {
if opt.MaxActiveConns < opt.PoolSize {
opt.MaxActiveConns = opt.PoolSize
}
semSize = opt.MaxActiveConns
}
//semSize = opt.PoolSize
p := &ConnPool{
cfg: opt,
semaphore: internal.NewFastSemaphore(semSize),
semaphore: internal.NewFastSemaphore(opt.PoolSize),
queue: make(chan struct{}, opt.PoolSize),
conns: make(map[uint64]*Conn),
dialsInProgress: make(chan struct{}, opt.MaxConcurrentDials),
@@ -470,17 +461,11 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
// Use cached time for health checks (max 50ms staleness is acceptable)
nowNs := getCachedTimeNs()
attempts := 0
// Lock-free atomic read - no mutex overhead!
hookManager := p.hookManager.Load()
for {
if attempts >= getAttempts {
internal.Logger.Printf(ctx, "redis: connection pool: was not able to get a healthy connection after %d attempts", attempts)
break
}
attempts++
for attempts := 0; attempts < getAttempts; attempts++ {
p.connsMu.Lock()
cn, err = p.popIdle()

View File

@@ -553,7 +553,9 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
cn.GetStateMachine().Transition(pool.StateClosed)
return fmt.Errorf("failed to enable maintnotifications: %w", maintNotifHandshakeErr)
default: // will handle auto and any other
internal.Logger.Printf(ctx, "auto mode fallback: maintnotifications disabled due to handshake error: %v", maintNotifHandshakeErr)
// Disabling logging here as it's too noisy.
// TODO: Enable when we have a better logging solution for log levels
// internal.Logger.Printf(ctx, "auto mode fallback: maintnotifications disabled due to handshake error: %v", maintNotifHandshakeErr)
c.opt.MaintNotificationsConfig.Mode = maintnotifications.ModeDisabled
c.optLock.Unlock()
// auto mode, disable maintnotifications and continue

View File

@@ -851,6 +851,11 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
}
}
// short circuit if no sentinels configured
if len(c.sentinelAddrs) == 0 {
return "", errors.New("redis: no sentinels configured")
}
var (
masterAddr string
wg sync.WaitGroup
@@ -898,10 +903,12 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
}
func joinErrors(errs []error) string {
if len(errs) == 0 {
return ""
}
if len(errs) == 1 {
return errs[0].Error()
}
b := []byte(errs[0].Error())
for _, err := range errs[1:] {
b = append(b, '\n')

View File

@@ -682,3 +682,99 @@ func compareSlices(t *testing.T, a, b []string, name string) {
}
}
}
type joinErrorsTest struct {
name string
errs []error
expected string
}
func TestJoinErrors(t *testing.T) {
tests := []joinErrorsTest{
{
name: "empty slice",
errs: []error{},
expected: "",
},
{
name: "single error",
errs: []error{errors.New("first error")},
expected: "first error",
},
{
name: "two errors",
errs: []error{errors.New("first error"), errors.New("second error")},
expected: "first error\nsecond error",
},
{
name: "multiple errors",
errs: []error{
errors.New("first error"),
errors.New("second error"),
errors.New("third error"),
},
expected: "first error\nsecond error\nthird error",
},
{
name: "nil slice",
errs: nil,
expected: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := redis.JoinErrors(tt.errs)
if result != tt.expected {
t.Errorf("joinErrors() = %q, want %q", result, tt.expected)
}
})
}
}
func BenchmarkJoinErrors(b *testing.B) {
benchmarks := []joinErrorsTest{
{
name: "empty slice",
errs: []error{},
expected: "",
},
{
name: "single error",
errs: []error{errors.New("first error")},
expected: "first error",
},
{
name: "two errors",
errs: []error{errors.New("first error"), errors.New("second error")},
expected: "first error\nsecond error",
},
{
name: "multiple errors",
errs: []error{
errors.New("first error"),
errors.New("second error"),
errors.New("third error"),
},
expected: "first error\nsecond error\nthird error",
},
{
name: "nil slice",
errs: nil,
expected: "",
},
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
result := redis.JoinErrors(bm.errs)
if result != bm.expected {
b.Errorf("joinErrors() = %q, want %q", result, bm.expected)
}
}
})
})
}
}

View File

@@ -263,6 +263,7 @@ type XReadGroupArgs struct {
Count int64
Block time.Duration
NoAck bool
Claim time.Duration // Claim idle pending entries older than this duration
}
func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd {
@@ -282,6 +283,10 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic
args = append(args, "noack")
keyPos++
}
if a.Claim > 0 {
args = append(args, "claim", int64(a.Claim/time.Millisecond))
keyPos += 2
}
args = append(args, "streams")
keyPos++
for _, s := range a.Streams {

View File

@@ -21,6 +21,7 @@ type StringCmdable interface {
MGet(ctx context.Context, keys ...string) *SliceCmd
MSet(ctx context.Context, values ...interface{}) *StatusCmd
MSetNX(ctx context.Context, values ...interface{}) *BoolCmd
MSetEX(ctx context.Context, args MSetEXArgs, values ...interface{}) *IntCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd
SetArgs(ctx context.Context, key string, value interface{}, a SetArgs) *StatusCmd
SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd
@@ -112,6 +113,35 @@ func (c cmdable) IncrByFloat(ctx context.Context, key string, value float64) *Fl
return cmd
}
type SetCondition string
const (
// NX only set the keys and their expiration if none exist
NX SetCondition = "NX"
// XX only set the keys and their expiration if all already exist
XX SetCondition = "XX"
)
type ExpirationMode string
const (
// EX sets expiration in seconds
EX ExpirationMode = "EX"
// PX sets expiration in milliseconds
PX ExpirationMode = "PX"
// EXAT sets expiration as Unix timestamp in seconds
EXAT ExpirationMode = "EXAT"
// PXAT sets expiration as Unix timestamp in milliseconds
PXAT ExpirationMode = "PXAT"
// KEEPTTL keeps the existing TTL
KEEPTTL ExpirationMode = "KEEPTTL"
)
type ExpirationOption struct {
Mode ExpirationMode
Value int64
}
func (c cmdable) LCS(ctx context.Context, q *LCSQuery) *LCSCmd {
cmd := NewLCSCmd(ctx, q)
_ = c(ctx, cmd)
@@ -157,6 +187,49 @@ func (c cmdable) MSetNX(ctx context.Context, values ...interface{}) *BoolCmd {
return cmd
}
type MSetEXArgs struct {
Condition SetCondition
Expiration *ExpirationOption
}
// MSetEX sets the given keys to their respective values.
// This command is an extension of the MSETNX that adds expiration and XX options.
// Available since Redis 8.4
// Important: When this method is used with Cluster clients, all keys
// must be in the same hash slot, otherwise CROSSSLOT error will be returned.
// For more information, see https://redis.io/commands/msetex
func (c cmdable) MSetEX(ctx context.Context, args MSetEXArgs, values ...interface{}) *IntCmd {
expandedArgs := appendArgs([]interface{}{}, values)
numkeys := len(expandedArgs) / 2
cmdArgs := make([]interface{}, 0, 2+len(expandedArgs)+3)
cmdArgs = append(cmdArgs, "msetex", numkeys)
cmdArgs = append(cmdArgs, expandedArgs...)
if args.Condition != "" {
cmdArgs = append(cmdArgs, string(args.Condition))
}
if args.Expiration != nil {
switch args.Expiration.Mode {
case EX:
cmdArgs = append(cmdArgs, "ex", args.Expiration.Value)
case PX:
cmdArgs = append(cmdArgs, "px", args.Expiration.Value)
case EXAT:
cmdArgs = append(cmdArgs, "exat", args.Expiration.Value)
case PXAT:
cmdArgs = append(cmdArgs, "pxat", args.Expiration.Value)
case KEEPTTL:
cmdArgs = append(cmdArgs, "keepttl")
}
}
cmd := NewIntCmd(ctx, cmdArgs...)
_ = c(ctx, cmd)
return cmd
}
// Set Redis `SET key value [expiration]` command.
// Use expiration for `SETEx`-like behavior.
//