1
0
mirror of https://github.com/redis/go-redis.git synced 2025-04-17 20:17:02 +03:00

Merge branch 'master' into graph

This commit is contained in:
Monkey 2024-06-21 09:38:44 +08:00 committed by GitHub
commit b59d3ebc10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 683 additions and 68 deletions

View File

@ -1,3 +1,12 @@
## Unreleased
### Changed
* `go-redis` won't skip span creation if the parent spans is not recording. ([#2980](https://github.com/redis/go-redis/issues/2980))
Users can use the OpenTelemetry sampler to control the sampling behavior.
For instance, you can use the `ParentBased(NeverSample())` sampler from `go.opentelemetry.io/otel/sdk/trace` to keep
a similar behavior (drop orphan spans) of `go-redis` as before.
## [9.0.5](https://github.com/redis/go-redis/compare/v9.0.4...v9.0.5) (2023-05-29)

View File

@ -31,7 +31,7 @@ build:
testdata/redis:
mkdir -p $@
wget -qO- https://download.redis.io/releases/redis-7.2.1.tar.gz | tar xvz --strip-components=1 -C $@
wget -qO- https://download.redis.io/releases/redis-7.4-rc1.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis
cd $< && make all

View File

@ -143,9 +143,6 @@ to this specification.
```go
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)

View File

@ -16,6 +16,7 @@ type BitMapCmdable interface {
BitPos(ctx context.Context, key string, bit int64, pos ...int64) *IntCmd
BitPosSpan(ctx context.Context, key string, bit int8, start, end int64, span string) *IntCmd
BitField(ctx context.Context, key string, values ...interface{}) *IntSliceCmd
BitFieldRO(ctx context.Context, key string, values ...interface{}) *IntSliceCmd
}
func (c cmdable) GetBit(ctx context.Context, key string, offset int64) *IntCmd {

View File

@ -4999,6 +4999,7 @@ type ClientInfo struct {
PSub int // number of pattern matching subscriptions
SSub int // redis version 7.0.3, number of shard channel subscriptions
Multi int // number of commands in a MULTI/EXEC context
Watch int // redis version 7.4 RC1, number of keys this client is currently watching.
QueryBuf int // qbuf, query buffer length (0 means no query pending)
QueryBufFree int // qbuf-free, free space of the query buffer (0 means the buffer is full)
ArgvMem int // incomplete arguments for the next command (already extracted from query buffer)
@ -5151,6 +5152,8 @@ func parseClientInfo(txt string) (info *ClientInfo, err error) {
info.SSub, err = strconv.Atoi(val)
case "multi":
info.Multi, err = strconv.Atoi(val)
case "watch":
info.Watch, err = strconv.Atoi(val)
case "qbuf":
info.QueryBuf, err = strconv.Atoi(val)
case "qbuf-free":

View File

@ -193,6 +193,40 @@ var _ = Describe("Commands", func() {
Expect(r.Val()).To(Equal(int64(0)))
})
It("should ClientKillByFilter with MAXAGE", Label("NonRedisEnterprise"), func() {
var s []string
started := make(chan bool)
done := make(chan bool)
go func() {
defer GinkgoRecover()
started <- true
blpop := client.BLPop(ctx, 0, "list")
Expect(blpop.Val()).To(Equal(s))
done <- true
}()
<-started
select {
case <-done:
Fail("BLPOP is not blocked.")
case <-time.After(2 * time.Second):
// ok
}
killed := client.ClientKillByFilter(ctx, "MAXAGE", "1")
Expect(killed.Err()).NotTo(HaveOccurred())
Expect(killed.Val()).To(SatisfyAny(Equal(int64(2)), Equal(int64(3))))
select {
case <-done:
// ok
case <-time.After(time.Second):
Fail("BLPOP is still blocked.")
}
})
It("should ClientID", func() {
err := client.ClientID(ctx).Err()
Expect(err).NotTo(HaveOccurred())
@ -1100,6 +1134,26 @@ var _ = Describe("Commands", func() {
keys, cursor, err := client.HScan(ctx, "myhash", 0, "", 0).Result()
Expect(err).NotTo(HaveOccurred())
// If we don't get at least two items back, it's really strange.
Expect(cursor).To(BeNumerically(">=", 2))
Expect(len(keys)).To(BeNumerically(">=", 2))
Expect(keys[0]).To(HavePrefix("key"))
Expect(keys[1]).To(Equal("hello"))
})
It("should HScan without values", Label("NonRedisEnterprise"), func() {
for i := 0; i < 1000; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
keys, cursor, err := client.HScanNoValues(ctx, "myhash", 0, "", 0).Result()
Expect(err).NotTo(HaveOccurred())
// If we don't get at least two items back, it's really strange.
Expect(cursor).To(BeNumerically(">=", 2))
Expect(len(keys)).To(BeNumerically(">=", 2))
Expect(keys[0]).To(HavePrefix("key"))
Expect(keys[1]).To(HavePrefix("key"))
Expect(keys).NotTo(BeEmpty())
Expect(cursor).NotTo(BeZero())
})
@ -2430,6 +2484,155 @@ var _ = Describe("Commands", func() {
Equal([]redis.KeyValue{{Key: "key2", Value: "hello2"}}),
))
})
It("should HExpire", Label("hash-expiration", "NonRedisEnterprise"), func() {
res, err := client.HExpire(ctx, "no_such_key", 10, "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err = client.HExpire(ctx, "myhash", 10, "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, 1, -2}))
})
It("should HPExpire", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPExpire(ctx, "no_such_key", 10, "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HPExpire(ctx, "myhash", 10, "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, 1, -2}))
})
It("should HExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HExpireAt(ctx, "myhash", time.Now().Add(10*time.Second), "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, 1, -2}))
})
It("should HPExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HPExpireAt(ctx, "myhash", time.Now().Add(10*time.Second), "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, 1, -2}))
})
It("should HPersist", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPersist(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HPersist(ctx, "myhash", "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{-1, -1, -2}))
res, err = client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))
res, err = client.HPersist(ctx, "myhash", "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -1, -2}))
})
It("should HExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))
res, err = client.HExpireTime(ctx, "myhash", "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res[0]).To(BeNumerically("~", time.Now().Add(10*time.Second).Unix(), 1))
})
It("should HPExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
expireAt := time.Now().Add(10 * time.Second)
res, err := client.HPExpireAt(ctx, "myhash", expireAt, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))
res, err = client.HPExpireTime(ctx, "myhash", "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo([]int64{expireAt.UnixMilli(), -1, -2}))
})
It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))
res, err = client.HTTL(ctx, "myhash", "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{10, -1, -2}))
})
It("should HPTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))
res, err = client.HPTTL(ctx, "myhash", "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res[0]).To(BeNumerically("~", 10*time.Second.Milliseconds(), 1))
})
})
Describe("hyperloglog", func() {
@ -5686,6 +5889,78 @@ var _ = Describe("Commands", func() {
Expect(err).To(Equal(redis.Nil))
})
It("should XRead LastEntry", Label("NonRedisEnterprise"), func() {
res, err := client.XRead(ctx, &redis.XReadArgs{
Streams: []string{"stream"},
Count: 2, // we expect 1 message
ID: "+",
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]redis.XStream{
{
Stream: "stream",
Messages: []redis.XMessage{
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
},
},
}))
})
It("should XRead LastEntry from two streams", Label("NonRedisEnterprise"), func() {
res, err := client.XRead(ctx, &redis.XReadArgs{
Streams: []string{"stream", "stream"},
ID: "+",
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]redis.XStream{
{
Stream: "stream",
Messages: []redis.XMessage{
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
},
},
{
Stream: "stream",
Messages: []redis.XMessage{
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
},
},
}))
})
It("should XRead LastEntry blocks", Label("NonRedisEnterprise"), func() {
start := time.Now()
go func() {
defer GinkgoRecover()
time.Sleep(100 * time.Millisecond)
id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "empty",
ID: "4-0",
Values: map[string]interface{}{"quatro": "quatre"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(id).To(Equal("4-0"))
}()
res, err := client.XRead(ctx, &redis.XReadArgs{
Streams: []string{"empty"},
Block: 500 * time.Millisecond,
ID: "+",
}).Result()
Expect(err).NotTo(HaveOccurred())
// Ensure that the XRead call with LastEntry option blocked for at least 100ms.
Expect(time.Since(start)).To(BeNumerically(">=", 100*time.Millisecond))
Expect(res).To(Equal([]redis.XStream{
{
Stream: "empty",
Messages: []redis.XMessage{
{ID: "4-0", Values: map[string]interface{}{"quatro": "quatre"}},
},
},
}))
})
Describe("group", func() {
BeforeEach(func() {
err := client.XGroupCreate(ctx, "stream", "group", "0").Err()

View File

@ -7,6 +7,7 @@ import (
"net"
"strings"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
)
@ -129,7 +130,9 @@ func isMovedError(err error) (moved bool, ask bool, addr string) {
if ind == -1 {
return false, false, ""
}
addr = s[ind+1:]
addr = internal.GetAddr(addr)
return
}

View File

@ -5,7 +5,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require (
github.com/redis/go-redis/v9 v9.5.1
github.com/redis/go-redis/v9 v9.5.3
go.uber.org/zap v1.24.0
)

View File

@ -1,6 +1,6 @@
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.5.1
require github.com/redis/go-redis/v9 v9.5.3
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect

View File

@ -1,5 +1,5 @@
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.5.1
require github.com/redis/go-redis/v9 v9.5.3
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect

View File

@ -1,5 +1,5 @@
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=

View File

@ -9,8 +9,8 @@ replace github.com/redis/go-redis/extra/redisotel/v9 => ../../extra/redisotel
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../../extra/rediscmd
require (
github.com/redis/go-redis/extra/redisotel/v9 v9.5.1
github.com/redis/go-redis/v9 v9.5.1
github.com/redis/go-redis/extra/redisotel/v9 v9.5.3
github.com/redis/go-redis/v9 v9.5.3
github.com/uptrace/uptrace-go v1.21.0
go.opentelemetry.io/otel v1.22.0
)
@ -23,7 +23,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.1 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 // indirect
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.5.1
require github.com/redis/go-redis/v9 v9.5.3
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect

View File

@ -1,5 +1,5 @@
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=

View File

@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/davecgh/go-spew v1.1.1
github.com/redis/go-redis/v9 v9.5.1
github.com/redis/go-redis/v9 v9.5.3
)
require (

View File

@ -1,5 +1,5 @@
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@ -482,7 +482,7 @@ func ExampleClient_Watch() {
return err
}
// Actual opperation (local in optimistic lock).
// Actual operation (local in optimistic lock).
n++
// Operation is committed only if the watched keys remain unchanged.

View File

@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../..
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd
require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.1
github.com/redis/go-redis/v9 v9.5.1
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3
github.com/redis/go-redis/v9 v9.5.3
go.opencensus.io v0.24.0
)

View File

@ -7,7 +7,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/bsm/ginkgo/v2 v2.12.0
github.com/bsm/gomega v1.27.10
github.com/redis/go-redis/v9 v9.5.1
github.com/redis/go-redis/v9 v9.5.3
)
require (

View File

@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../..
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd
require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.1
github.com/redis/go-redis/v9 v9.5.1
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3
github.com/redis/go-redis/v9 v9.5.3
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/metric v1.22.0
go.opentelemetry.io/otel/sdk v1.22.0

View File

@ -91,10 +91,6 @@ func newTracingHook(connString string, opts ...TracingOption) *tracingHook {
func (th *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
if !trace.SpanFromContext(ctx).IsRecording() {
return hook(ctx, network, addr)
}
ctx, span := th.conf.tracer.Start(ctx, "redis.dial", th.spanOpts...)
defer span.End()
@ -109,10 +105,6 @@ func (th *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {
func (th *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
if !trace.SpanFromContext(ctx).IsRecording() {
return hook(ctx, cmd)
}
fn, file, line := funcFileLine("github.com/redis/go-redis")
attrs := make([]attribute.KeyValue, 0, 8)
@ -145,10 +137,6 @@ func (th *tracingHook) ProcessPipelineHook(
hook redis.ProcessPipelineHook,
) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
if !trace.SpanFromContext(ctx).IsRecording() {
return hook(ctx, cmds)
}
fn, file, line := funcFileLine("github.com/redis/go-redis")
attrs := make([]attribute.KeyValue, 0, 8)

View File

@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/prometheus/client_golang v1.14.0
github.com/redis/go-redis/v9 v9.5.1
github.com/redis/go-redis/v9 v9.5.3
)
require (

View File

@ -1,6 +1,9 @@
package redis
import "context"
import (
"context"
"time"
)
type HashCmdable interface {
HDel(ctx context.Context, key string, fields ...string) *IntCmd
@ -16,6 +19,7 @@ type HashCmdable interface {
HMSet(ctx context.Context, key string, values ...interface{}) *BoolCmd
HSetNX(ctx context.Context, key, field string, value interface{}) *BoolCmd
HScan(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd
HScanNoValues(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd
HVals(ctx context.Context, key string) *StringSliceCmd
HRandField(ctx context.Context, key string, count int) *StringSliceCmd
HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd
@ -172,3 +176,262 @@ func (c cmdable) HScan(ctx context.Context, key string, cursor uint64, match str
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) HScanNoValues(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd {
args := []interface{}{"hscan", key, cursor}
if match != "" {
args = append(args, "match", match)
}
if count > 0 {
args = append(args, "count", count)
}
args = append(args, "novalues")
cmd := NewScanCmd(ctx, c, args...)
_ = c(ctx, cmd)
return cmd
}
type HExpireArgs struct {
NX bool
XX bool
GT bool
LT bool
}
// HExpire - Sets the expiration time for specified fields in a hash in seconds.
// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
// For more information - https://redis.io/commands/hexpire/
func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIRE", key, expiration, "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// HExpire - Sets the expiration time for specified fields in a hash in seconds.
// It requires a key, an expiration duration, a struct with boolean flags for conditional expiration settings (NX, XX, GT, LT), and a list of fields.
// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
// For more information - https://redis.io/commands/hexpire/
func (c cmdable) HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIRE", key, expiration}
// only if one argument is true, we can add it to the args
// if more than one argument is true, it will cause an error
if expirationArgs.NX {
args = append(args, "NX")
} else if expirationArgs.XX {
args = append(args, "XX")
} else if expirationArgs.GT {
args = append(args, "GT")
} else if expirationArgs.LT {
args = append(args, "LT")
}
args = append(args, "FIELDS", len(fields))
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// HPExpire - Sets the expiration time for specified fields in a hash in milliseconds.
// Similar to HExpire, it accepts a key, an expiration duration in milliseconds, a struct with expiration condition flags, and a list of fields.
// The command modifies the standard time.Duration to milliseconds for the Redis command.
// For more information - https://redis.io/commands/hpexpire/
func (c cmdable) HPExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd {
args := []interface{}{"HPEXPIRE", key, formatMs(ctx, expiration), "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) HPExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
args := []interface{}{"HPEXPIRE", key, formatMs(ctx, expiration)}
// only if one argument is true, we can add it to the args
// if more than one argument is true, it will cause an error
if expirationArgs.NX {
args = append(args, "NX")
} else if expirationArgs.XX {
args = append(args, "XX")
} else if expirationArgs.GT {
args = append(args, "GT")
} else if expirationArgs.LT {
args = append(args, "LT")
}
args = append(args, "FIELDS", len(fields))
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// HExpireAt - Sets the expiration time for specified fields in a hash to a UNIX timestamp in seconds.
// Takes a key, a UNIX timestamp, a struct of conditional flags, and a list of fields.
// The command sets absolute expiration times based on the UNIX timestamp provided.
// For more information - https://redis.io/commands/hexpireat/
func (c cmdable) HExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIREAT", key, tm.Unix(), "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) HExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIREAT", key, tm.Unix()}
// only if one argument is true, we can add it to the args
// if more than one argument is true, it will cause an error
if expirationArgs.NX {
args = append(args, "NX")
} else if expirationArgs.XX {
args = append(args, "XX")
} else if expirationArgs.GT {
args = append(args, "GT")
} else if expirationArgs.LT {
args = append(args, "LT")
}
args = append(args, "FIELDS", len(fields))
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// HPExpireAt - Sets the expiration time for specified fields in a hash to a UNIX timestamp in milliseconds.
// Similar to HExpireAt but for timestamps in milliseconds. It accepts the same parameters and adjusts the UNIX time to milliseconds.
// For more information - https://redis.io/commands/hpexpireat/
func (c cmdable) HPExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd {
args := []interface{}{"HPEXPIREAT", key, tm.UnixNano() / int64(time.Millisecond), "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) HPExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
args := []interface{}{"HPEXPIREAT", key, tm.UnixNano() / int64(time.Millisecond)}
// only if one argument is true, we can add it to the args
// if more than one argument is true, it will cause an error
if expirationArgs.NX {
args = append(args, "NX")
} else if expirationArgs.XX {
args = append(args, "XX")
} else if expirationArgs.GT {
args = append(args, "GT")
} else if expirationArgs.LT {
args = append(args, "LT")
}
args = append(args, "FIELDS", len(fields))
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// HPersist - Removes the expiration time from specified fields in a hash.
// Accepts a key and the fields themselves.
// This command ensures that each field specified will have its expiration removed if present.
// For more information - https://redis.io/commands/hpersist/
func (c cmdable) HPersist(ctx context.Context, key string, fields ...string) *IntSliceCmd {
args := []interface{}{"HPERSIST", key, "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// HExpireTime - Retrieves the expiration time for specified fields in a hash as a UNIX timestamp in seconds.
// Requires a key and the fields themselves to fetch their expiration timestamps.
// This command returns the expiration times for each field or error/status codes for each field as specified.
// For more information - https://redis.io/commands/hexpiretime/
func (c cmdable) HExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIRETIME", key, "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// HPExpireTime - Retrieves the expiration time for specified fields in a hash as a UNIX timestamp in milliseconds.
// Similar to HExpireTime, adjusted for timestamps in milliseconds. It requires the same parameters.
// Provides the expiration timestamp for each field in milliseconds.
// For more information - https://redis.io/commands/hexpiretime/
func (c cmdable) HPExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd {
args := []interface{}{"HPEXPIRETIME", key, "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// HTTL - Retrieves the remaining time to live for specified fields in a hash in seconds.
// Requires a key and the fields themselves. It returns the TTL for each specified field.
// This command fetches the TTL in seconds for each field or returns error/status codes as appropriate.
// For more information - https://redis.io/commands/httl/
func (c cmdable) HTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd {
args := []interface{}{"HTTL", key, "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// HPTTL - Retrieves the remaining time to live for specified fields in a hash in milliseconds.
// Similar to HTTL, but returns the TTL in milliseconds. It requires a key and the specified fields.
// This command provides the TTL in milliseconds for each field or returns error/status codes as needed.
// For more information - https://redis.io/commands/hpttl/
func (c cmdable) HPTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd {
args := []interface{}{"HPTTL", key, "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

View File

@ -2,6 +2,7 @@ package internal
import (
"context"
"net"
"strings"
"time"
@ -64,3 +65,19 @@ func ReplaceSpaces(s string) string {
return builder.String()
}
func GetAddr(addr string) string {
ind := strings.LastIndexByte(addr, ':')
if ind == -1 {
return ""
}
if strings.IndexByte(addr, '.') != -1 {
return addr
}
if addr[0] == '[' {
return addr
}
return net.JoinHostPort(addr[:ind], addr[ind+1:])
}

View File

@ -51,3 +51,24 @@ func TestIsLower(t *testing.T) {
Expect(isLower(str)).To(BeTrue())
})
}
func TestGetAddr(t *testing.T) {
It("getAddr", func() {
str := "127.0.0.1:1234"
Expect(GetAddr(str)).To(Equal(str))
str = "[::1]:1234"
Expect(GetAddr(str)).To(Equal(str))
str = "[fd01:abcd::7d03]:6379"
Expect(GetAddr(str)).To(Equal(str))
Expect(GetAddr("::1:1234")).To(Equal("[::1]:1234"))
Expect(GetAddr("fd01:abcd::7d03:6379")).To(Equal("[fd01:abcd::7d03]:6379"))
Expect(GetAddr("127.0.0.1")).To(Equal(""))
Expect(GetAddr("127")).To(Equal(""))
})
}

View File

@ -96,6 +96,22 @@ var _ = Describe("ScanIterator", func() {
Expect(vals).To(HaveLen(71 * 2))
Expect(vals).To(ContainElement("K01"))
Expect(vals).To(ContainElement("K71"))
Expect(vals).To(ContainElement("x"))
})
It("should hscan without values across multiple pages", Label("NonRedisEnterprise"), func() {
Expect(hashSeed(71)).NotTo(HaveOccurred())
var vals []string
iter := client.HScanNoValues(ctx, hashKey, 0, "", 10).Iterator()
for iter.Next(ctx) {
vals = append(vals, iter.Val())
}
Expect(iter.Err()).NotTo(HaveOccurred())
Expect(vals).To(HaveLen(71))
Expect(vals).To(ContainElement("K01"))
Expect(vals).To(ContainElement("K71"))
Expect(vals).NotTo(ContainElement("x"))
})
It("should scan to page borders", func() {

View File

@ -242,18 +242,18 @@ var _ = Describe("JSON Commands", Label("json"), func() {
Expect(cmd.Val()).To(Equal("OK"))
})
It("should JSONGet", Label("json.get", "json"), func() {
It("should JSONGet", Label("json.get", "json", "NonRedisEnterprise"), func() {
res, err := client.JSONSet(ctx, "get3", "$", `{"a": 1, "b": 2}`).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK"))
res, err = client.JSONGetWithArgs(ctx, "get3", &redis.JSONGetArgs{Indent: "-"}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal(`[-{--"a":1,--"b":2-}]`))
Expect(res).To(Equal(`{-"a":1,-"b":2}`))
res, err = client.JSONGetWithArgs(ctx, "get3", &redis.JSONGetArgs{Indent: "-", Newline: `~`, Space: `!`}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal(`[~-{~--"a":!1,~--"b":!2~-}~]`))
Expect(res).To(Equal(`{~-"a":!1,~-"b":!2~}`))
})
It("should JSONMerge", Label("json.merge", "json"), func() {

View File

@ -61,6 +61,12 @@ type Options struct {
// before reconnecting. It should return the current username and password.
CredentialsProvider func() (username string, password string)
// CredentialsProviderContext is an enhanced parameter of CredentialsProvider,
// done to maintain API compatibility. In the future,
// there might be a merge between CredentialsProviderContext and CredentialsProvider.
// There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider.
CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
// Database to be selected after connecting to the server.
DB int
@ -250,12 +256,12 @@ func NewDialer(opt *Options) func(context.Context, string, string) (net.Conn, er
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
// - only scalar type fields are supported (bool, int, time.Duration)
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
// additionally a plain integer as value (i.e. without unit) is interpreted as seconds
// - to disable a duration field, use value less than or equal to 0; to use the default
// value, leave the value blank or remove the parameter
// - only the last value is interpreted if a parameter is given multiple times
// - fields "network", "addr", "username" and "password" can only be set using other
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
// URL attributes (scheme, host, userinfo, resp.), query parameters using these
// names will be treated as unknown parameters
// - unknown parameter names will result in an error
//

View File

@ -62,10 +62,11 @@ type ClusterOptions struct {
OnConnect func(ctx context.Context, cn *Conn) error
Protocol int
Username string
Password string
CredentialsProvider func() (username string, password string)
Protocol int
Username string
Password string
CredentialsProvider func() (username string, password string)
CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
MaxRetries int
MinRetryBackoff time.Duration
@ -157,12 +158,12 @@ func (opt *ClusterOptions) init() {
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
// - only scalar type fields are supported (bool, int, time.Duration)
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
// additionally a plain integer as value (i.e. without unit) is interpreted as seconds
// - to disable a duration field, use value less than or equal to 0; to use the default
// value, leave the value blank or remove the parameter
// - only the last value is interpreted if a parameter is given multiple times
// - fields "network", "addr", "username" and "password" can only be set using other
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
// URL attributes (scheme, host, userinfo, resp.), query parameters using these
// names will be treated as unknown parameters
// - unknown parameter names will result in an error
//
@ -272,10 +273,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
CredentialsProvider: opt.CredentialsProvider,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
CredentialsProvider: opt.CredentialsProvider,
CredentialsProviderContext: opt.CredentialsProviderContext,
MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff,

View File

@ -491,7 +491,7 @@ func (c *PubSub) getContext() context.Context {
// Receive* APIs can not be used after channel is created.
//
// go-redis periodically sends ping messages to test connection health
// and re-subscribes if ping can not not received for 1 minute.
// and re-subscribes if ping can not received for 1 minute.
func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
c.chOnce.Do(func() {
c.msgCh = newChannel(c, opts...)

View File

@ -283,8 +283,13 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
}
cn.Inited = true
var err error
username, password := c.opt.Username, c.opt.Password
if c.opt.CredentialsProvider != nil {
if c.opt.CredentialsProviderContext != nil {
if username, password, err = c.opt.CredentialsProviderContext(ctx); err != nil {
return err
}
} else if c.opt.CredentialsProvider != nil {
username, password = c.opt.CredentialsProvider()
}
@ -300,7 +305,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
// for redis-server versions that do not support the HELLO command,
// RESP2 will continue to be used.
if err := conn.Hello(ctx, protocol, username, password, "").Err(); err == nil {
if err = conn.Hello(ctx, protocol, username, password, "").Err(); err == nil {
auth = true
} else if !isRedisError(err) {
// When the server responds with the RESP protocol and the result is not a normal
@ -313,7 +318,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
return err
}
_, err := conn.Pipelined(ctx, func(pipe Pipeliner) error {
_, err = conn.Pipelined(ctx, func(pipe Pipeliner) error {
if !auth && password != "" {
if username != "" {
pipe.AuthACL(ctx, username, password)

View File

@ -469,7 +469,7 @@ var _ = Describe("Client OnConnect", func() {
})
})
var _ = Describe("Client context cancelation", func() {
var _ = Describe("Client context cancellation", func() {
var opt *redis.Options
var client *redis.Client
@ -484,7 +484,7 @@ var _ = Describe("Client context cancelation", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("Blocking operation cancelation", func() {
It("Blocking operation cancellation", func() {
ctx, cancel := context.WithCancel(ctx)
cancel()

View File

@ -60,9 +60,6 @@ do
done
sed --in-place "s/\(return \)\"[^\"]*\"/\1\"${TAG#v}\"/" ./version.go
sed --in-place "s/\(\"version\": \)\"[^\"]*\"/\1\"${TAG#v}\"/" ./package.json
conventional-changelog -p angular -i CHANGELOG.md -s
git checkout -b release/${TAG} master
git add -u

View File

@ -137,10 +137,11 @@ type XReadArgs struct {
Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2
Count int64
Block time.Duration
ID string
}
func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
args := make([]interface{}, 0, 6+len(a.Streams))
args := make([]interface{}, 0, 2*len(a.Streams)+6)
args = append(args, "xread")
keyPos := int8(1)
@ -159,6 +160,11 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
for _, s := range a.Streams {
args = append(args, s)
}
if a.ID != "" {
for range a.Streams {
args = append(args, a.ID)
}
}
cmd := NewXStreamSliceCmd(ctx, args...)
if a.Block >= 0 {
@ -178,36 +184,42 @@ func (c cmdable) XReadStreams(ctx context.Context, streams ...string) *XStreamSl
func (c cmdable) XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd {
cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd {
cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start, "mkstream")
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd {
cmd := NewStatusCmd(ctx, "xgroup", "setid", stream, group, start)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "destroy", stream, group)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}

View File

@ -2,5 +2,5 @@ package redis
// Version is the current release version.
func Version() string {
return "9.5.1"
return "9.5.3"
}