1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-16 13:21:51 +03:00

Merge branch 'master' into bitop

This commit is contained in:
Nedyalko Dyakov
2025-07-02 17:14:08 +03:00
committed by GitHub
32 changed files with 848 additions and 109 deletions

View File

@ -8,7 +8,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Check Spelling
uses: rojopolis/spellcheck-github-actions@0.49.0
uses: rojopolis/spellcheck-github-actions@0.51.0
with:
config_path: .github/spellcheck-settings.yml
task_name: Markdown

View File

@ -1,5 +1,43 @@
# Release Notes
# 9.11.0 (2025-06-24)
## 🚀 Highlights
Fixes TxPipeline to work correctly in cluster scenarios, allowing execution of commands
only in the same slot.
# Changes
## 🚀 New Features
- Set cluster slot for `scan` commands, rather than random ([#2623](https://github.com/redis/go-redis/pull/2623))
- Add CredentialsProvider field to UniversalOptions ([#2927](https://github.com/redis/go-redis/pull/2927))
- feat(redisotel): add WithCallerEnabled option ([#3415](https://github.com/redis/go-redis/pull/3415))
## 🐛 Bug Fixes
- fix(txpipeline): keyless commands should take the slot of the keyed ([#3411](https://github.com/redis/go-redis/pull/3411))
- fix(loading): cache the loaded flag for slave nodes ([#3410](https://github.com/redis/go-redis/pull/3410))
- fix(txpipeline): should return error on multi/exec on multiple slots ([#3408](https://github.com/redis/go-redis/pull/3408))
- fix: check if the shard exists to avoid returning nil ([#3396](https://github.com/redis/go-redis/pull/3396))
## 🧰 Maintenance
- feat: optimize connection pool waitTurn ([#3412](https://github.com/redis/go-redis/pull/3412))
- chore(ci): update CI redis builds ([#3407](https://github.com/redis/go-redis/pull/3407))
- chore: remove a redundant method from `Ring`, `Client` and `ClusterClient` ([#3401](https://github.com/redis/go-redis/pull/3401))
- test: refactor TestBasicCredentials using table-driven tests ([#3406](https://github.com/redis/go-redis/pull/3406))
- perf: reduce unnecessary memory allocation operations ([#3399](https://github.com/redis/go-redis/pull/3399))
- fix: insert entry during iterating over a map ([#3398](https://github.com/redis/go-redis/pull/3398))
- DOC-5229 probabilistic data type examples ([#3413](https://github.com/redis/go-redis/pull/3413))
- chore(deps): bump rojopolis/spellcheck-github-actions from 0.49.0 to 0.51.0 ([#3414](https://github.com/redis/go-redis/pull/3414))
## Contributors
We'd like to thank all the contributors who worked on this release!
[@andy-stark-redis](https://github.com/andy-stark-redis), [@boekkooi-impossiblecloud](https://github.com/boekkooi-impossiblecloud), [@cxljs](https://github.com/cxljs), [@dcherubini](https://github.com/dcherubini), [@dependabot[bot]](https://github.com/apps/dependabot), [@iamamirsalehi](https://github.com/iamamirsalehi), [@ndyakov](https://github.com/ndyakov), [@pete-woods](https://github.com/pete-woods), [@twz915](https://github.com/twz915) and [dependabot[bot]](https://github.com/apps/dependabot)
# 9.10.0 (2025-06-06)
## 🚀 Highlights

View File

@ -17,6 +17,55 @@ import (
"github.com/redis/go-redis/v9/internal/util"
)
// keylessCommands contains Redis commands that have empty key specifications (9th slot empty)
// Only includes core Redis commands, excludes FT.*, ts.*, timeseries.*, search.* and subcommands
var keylessCommands = map[string]struct{}{
"acl": {},
"asking": {},
"auth": {},
"bgrewriteaof": {},
"bgsave": {},
"client": {},
"cluster": {},
"config": {},
"debug": {},
"discard": {},
"echo": {},
"exec": {},
"failover": {},
"function": {},
"hello": {},
"latency": {},
"lolwut": {},
"module": {},
"monitor": {},
"multi": {},
"pfselftest": {},
"ping": {},
"psubscribe": {},
"psync": {},
"publish": {},
"pubsub": {},
"punsubscribe": {},
"quit": {},
"readonly": {},
"readwrite": {},
"replconf": {},
"replicaof": {},
"role": {},
"save": {},
"script": {},
"select": {},
"shutdown": {},
"slaveof": {},
"slowlog": {},
"subscribe": {},
"swapdb": {},
"sync": {},
"unsubscribe": {},
"unwatch": {},
}
type Cmder interface {
// command name.
// e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster".
@ -75,12 +124,22 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error {
return wr.WriteArgs(cmd.Args())
}
// cmdFirstKeyPos returns the position of the first key in the command's arguments.
// If the command does not have a key, it returns 0.
// TODO: Use the data in CommandInfo to determine the first key position.
func cmdFirstKeyPos(cmd Cmder) int {
if pos := cmd.firstKeyPos(); pos != 0 {
return int(pos)
}
switch cmd.Name() {
name := cmd.Name()
// first check if the command is keyless
if _, ok := keylessCommands[name]; ok {
return 0
}
switch name {
case "eval", "evalsha", "eval_ro", "evalsha_ro":
if cmd.stringArg(2) != "0" {
return 3

View File

@ -0,0 +1,412 @@
// EXAMPLE: home_prob_dts
// HIDE_START
package example_commands_test
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
// HIDE_END
func ExampleClient_probabilistic_datatypes() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
// REMOVE_START
rdb.FlushDB(ctx)
rdb.Del(ctx,
"recorded_users", "other_users",
"group:1", "group:2", "both_groups",
"items_sold",
"male_heights", "female_heights", "all_heights",
"top_3_songs")
// REMOVE_END
// STEP_START bloom
res1, err := rdb.BFMAdd(
ctx,
"recorded_users",
"andy", "cameron", "david", "michelle",
).Result()
if err != nil {
panic(err)
}
fmt.Println(res1) // >>> [true true true true]
res2, err := rdb.BFExists(ctx,
"recorded_users", "cameron",
).Result()
if err != nil {
panic(err)
}
fmt.Println(res2) // >>> true
res3, err := rdb.BFExists(ctx, "recorded_users", "kaitlyn").Result()
if err != nil {
panic(err)
}
fmt.Println(res3) // >>> false
// STEP_END
// STEP_START cuckoo
res4, err := rdb.CFAdd(ctx, "other_users", "paolo").Result()
if err != nil {
panic(err)
}
fmt.Println(res4) // >>> true
res5, err := rdb.CFAdd(ctx, "other_users", "kaitlyn").Result()
if err != nil {
panic(err)
}
fmt.Println(res5) // >>> true
res6, err := rdb.CFAdd(ctx, "other_users", "rachel").Result()
if err != nil {
panic(err)
}
fmt.Println(res6) // >>> true
res7, err := rdb.CFMExists(ctx,
"other_users", "paolo", "rachel", "andy",
).Result()
if err != nil {
panic(err)
}
fmt.Println(res7) // >>> [true true false]
res8, err := rdb.CFDel(ctx, "other_users", "paolo").Result()
if err != nil {
panic(err)
}
fmt.Println(res8) // >>> true
res9, err := rdb.CFExists(ctx, "other_users", "paolo").Result()
if err != nil {
panic(err)
}
fmt.Println(res9) // >>> false
// STEP_END
// STEP_START hyperloglog
res10, err := rdb.PFAdd(
ctx,
"group:1",
"andy", "cameron", "david",
).Result()
if err != nil {
panic(err)
}
fmt.Println(res10) // >>> 1
res11, err := rdb.PFCount(ctx, "group:1").Result()
if err != nil {
panic(err)
}
fmt.Println(res11) // >>> 3
res12, err := rdb.PFAdd(ctx,
"group:2",
"kaitlyn", "michelle", "paolo", "rachel",
).Result()
if err != nil {
panic(err)
}
fmt.Println(res12) // >>> 1
res13, err := rdb.PFCount(ctx, "group:2").Result()
if err != nil {
panic(err)
}
fmt.Println(res13) // >>> 4
res14, err := rdb.PFMerge(
ctx,
"both_groups",
"group:1", "group:2",
).Result()
if err != nil {
panic(err)
}
fmt.Println(res14) // >>> OK
res15, err := rdb.PFCount(ctx, "both_groups").Result()
if err != nil {
panic(err)
}
fmt.Println(res15) // >>> 7
// STEP_END
// STEP_START cms
// Specify that you want to keep the counts within 0.01
// (0.1%) of the true value with a 0.005 (0.05%) chance
// of going outside this limit.
res16, err := rdb.CMSInitByProb(ctx, "items_sold", 0.01, 0.005).Result()
if err != nil {
panic(err)
}
fmt.Println(res16) // >>> OK
// The parameters for `CMSIncrBy()` are two lists. The count
// for each item in the first list is incremented by the
// value at the same index in the second list.
res17, err := rdb.CMSIncrBy(ctx, "items_sold",
"bread", 300,
"tea", 200,
"coffee", 200,
"beer", 100,
).Result()
if err != nil {
panic(err)
}
fmt.Println(res17) // >>> [300 200 200 100]
res18, err := rdb.CMSIncrBy(ctx, "items_sold",
"bread", 100,
"coffee", 150,
).Result()
if err != nil {
panic(err)
}
fmt.Println(res18) // >>> [400 350]
res19, err := rdb.CMSQuery(ctx,
"items_sold",
"bread", "tea", "coffee", "beer",
).Result()
if err != nil {
panic(err)
}
fmt.Println(res19) // >>> [400 200 350 100]
// STEP_END
// STEP_START tdigest
res20, err := rdb.TDigestCreate(ctx, "male_heights").Result()
if err != nil {
panic(err)
}
fmt.Println(res20) // >>> OK
res21, err := rdb.TDigestAdd(ctx, "male_heights",
175.5, 181, 160.8, 152, 177, 196, 164,
).Result()
if err != nil {
panic(err)
}
fmt.Println(res21) // >>> OK
res22, err := rdb.TDigestMin(ctx, "male_heights").Result()
if err != nil {
panic(err)
}
fmt.Println(res22) // >>> 152
res23, err := rdb.TDigestMax(ctx, "male_heights").Result()
if err != nil {
panic(err)
}
fmt.Println(res23) // >>> 196
res24, err := rdb.TDigestQuantile(ctx, "male_heights", 0.75).Result()
if err != nil {
panic(err)
}
fmt.Println(res24) // >>> [181]
// Note that the CDF value for 181 is not exactly
// 0.75. Both values are estimates.
res25, err := rdb.TDigestCDF(ctx, "male_heights", 181).Result()
if err != nil {
panic(err)
}
fmt.Printf("%.4f\n", res25[0]) // >>> 0.7857
res26, err := rdb.TDigestCreate(ctx, "female_heights").Result()
if err != nil {
panic(err)
}
fmt.Println(res26) // >>> OK
res27, err := rdb.TDigestAdd(ctx, "female_heights",
155.5, 161, 168.5, 170, 157.5, 163, 171,
).Result()
if err != nil {
panic(err)
}
fmt.Println(res27) // >>> OK
res28, err := rdb.TDigestQuantile(ctx, "female_heights", 0.75).Result()
if err != nil {
panic(err)
}
fmt.Println(res28) // >>> [170]
res29, err := rdb.TDigestMerge(ctx, "all_heights",
nil,
"male_heights", "female_heights",
).Result()
if err != nil {
panic(err)
}
fmt.Println(res29) // >>> OK
res30, err := rdb.TDigestQuantile(ctx, "all_heights", 0.75).Result()
if err != nil {
panic(err)
}
fmt.Println(res30) // >>> [175.5]
// STEP_END
// STEP_START topk
// Create a TopK filter that keeps track of the top 3 items
res31, err := rdb.TopKReserve(ctx, "top_3_songs", 3).Result()
if err != nil {
panic(err)
}
fmt.Println(res31) // >>> OK
// Add some items to the filter
res32, err := rdb.TopKIncrBy(ctx,
"top_3_songs",
"Starfish Trooper", 3000,
"Only one more time", 1850,
"Rock me, Handel", 1325,
"How will anyone know?", 3890,
"Average lover", 4098,
"Road to everywhere", 770,
).Result()
if err != nil {
panic(err)
}
fmt.Println(res32)
// >>> [ Rock me, Handel Only one more time ]
res33, err := rdb.TopKList(ctx, "top_3_songs").Result()
if err != nil {
panic(err)
}
fmt.Println(res33)
// >>> [Average lover How will anyone know? Starfish Trooper]
// Query the count for specific items
res34, err := rdb.TopKQuery(
ctx,
"top_3_songs",
"Starfish Trooper", "Road to everywhere",
).Result()
if err != nil {
panic(err)
}
fmt.Println(res34) // >>> [true false]
// STEP_END
// Output:
// [true true true true]
// true
// false
// true
// true
// true
// [true true false]
// true
// false
// 1
// 3
// 1
// 4
// OK
// 7
// OK
// [300 200 200 100]
// [400 350]
// [400 200 350 100]
// OK
// OK
// 152
// 196
// [181]
// 0.7857
// OK
// OK
// [170]
// OK
// [175.5]
// OK
// [ Rock me, Handel Only one more time ]
// [Average lover How will anyone know? Starfish Trooper]
// [true false]
}

View File

@ -5,7 +5,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require (
github.com/redis/go-redis/v9 v9.10.0
github.com/redis/go-redis/v9 v9.11.0
go.uber.org/zap v1.24.0
)

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.10.0
require github.com/redis/go-redis/v9 v9.11.0
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect

View File

@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/davecgh/go-spew v1.1.1
github.com/redis/go-redis/v9 v9.10.0
github.com/redis/go-redis/v9 v9.11.0
)
require (

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.10.0
require github.com/redis/go-redis/v9 v9.11.0
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect

View File

@ -11,8 +11,8 @@ replace github.com/redis/go-redis/extra/redisotel/v9 => ../../extra/redisotel
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../../extra/rediscmd
require (
github.com/redis/go-redis/extra/redisotel/v9 v9.10.0
github.com/redis/go-redis/v9 v9.10.0
github.com/redis/go-redis/extra/redisotel/v9 v9.11.0
github.com/redis/go-redis/v9 v9.11.0
github.com/uptrace/uptrace-go v1.21.0
go.opentelemetry.io/otel v1.22.0
)
@ -25,7 +25,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.10.0 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.11.0 // indirect
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.10.0
require github.com/redis/go-redis/v9 v9.11.0
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect

View File

@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/davecgh/go-spew v1.1.1
github.com/redis/go-redis/v9 v9.10.0
github.com/redis/go-redis/v9 v9.11.0
)
require (

View File

@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../..
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd
require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.10.0
github.com/redis/go-redis/v9 v9.10.0
github.com/redis/go-redis/extra/rediscmd/v9 v9.11.0
github.com/redis/go-redis/v9 v9.11.0
go.opencensus.io v0.24.0
)

View File

@ -7,7 +7,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/bsm/ginkgo/v2 v2.12.0
github.com/bsm/gomega v1.27.10
github.com/redis/go-redis/v9 v9.10.0
github.com/redis/go-redis/v9 v9.11.0
)
require (

View File

@ -20,6 +20,7 @@ type config struct {
tracer trace.Tracer
dbStmtEnabled bool
callerEnabled bool
// Metrics options.
@ -57,6 +58,7 @@ func newConfig(opts ...baseOption) *config {
tp: otel.GetTracerProvider(),
mp: otel.GetMeterProvider(),
dbStmtEnabled: true,
callerEnabled: true,
}
for _, opt := range opts {
@ -106,13 +108,20 @@ func WithTracerProvider(provider trace.TracerProvider) TracingOption {
})
}
// WithDBStatement tells the tracing hook not to log raw redis commands.
// WithDBStatement tells the tracing hook to log raw redis commands.
func WithDBStatement(on bool) TracingOption {
return tracingOption(func(conf *config) {
conf.dbStmtEnabled = on
})
}
// WithCallerEnabled tells the tracing hook to log the calling function, file and line.
func WithCallerEnabled(on bool) TracingOption {
return tracingOption(func(conf *config) {
conf.callerEnabled = on
})
}
//------------------------------------------------------------------------------
type MetricsOption interface {

View File

@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../..
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd
require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.10.0
github.com/redis/go-redis/v9 v9.10.0
github.com/redis/go-redis/extra/rediscmd/v9 v9.11.0
github.com/redis/go-redis/v9 v9.11.0
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/metric v1.22.0
go.opentelemetry.io/otel/sdk v1.22.0

View File

@ -101,14 +101,16 @@ func (th *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {
func (th *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
fn, file, line := funcFileLine("github.com/redis/go-redis")
attrs := make([]attribute.KeyValue, 0, 8)
attrs = append(attrs,
semconv.CodeFunction(fn),
semconv.CodeFilepath(file),
semconv.CodeLineNumber(line),
)
if th.conf.callerEnabled {
fn, file, line := funcFileLine("github.com/redis/go-redis")
attrs = append(attrs,
semconv.CodeFunction(fn),
semconv.CodeFilepath(file),
semconv.CodeLineNumber(line),
)
}
if th.conf.dbStmtEnabled {
cmdString := rediscmd.CmdString(cmd)
@ -133,16 +135,20 @@ func (th *tracingHook) ProcessPipelineHook(
hook redis.ProcessPipelineHook,
) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
fn, file, line := funcFileLine("github.com/redis/go-redis")
attrs := make([]attribute.KeyValue, 0, 8)
attrs = append(attrs,
semconv.CodeFunction(fn),
semconv.CodeFilepath(file),
semconv.CodeLineNumber(line),
attribute.Int("db.redis.num_cmd", len(cmds)),
)
if th.conf.callerEnabled {
fn, file, line := funcFileLine("github.com/redis/go-redis")
attrs = append(attrs,
semconv.CodeFunction(fn),
semconv.CodeFilepath(file),
semconv.CodeLineNumber(line),
)
}
summary, cmdsString := rediscmd.CmdsString(cmds)
if th.conf.dbStmtEnabled {
attrs = append(attrs, semconv.DBStatement(cmdsString))

View File

@ -66,6 +66,35 @@ func TestWithDBStatement(t *testing.T) {
}
}
func TestWithoutCaller(t *testing.T) {
provider := sdktrace.NewTracerProvider()
hook := newTracingHook(
"",
WithTracerProvider(provider),
WithCallerEnabled(false),
)
ctx, span := provider.Tracer("redis-test").Start(context.TODO(), "redis-test")
cmd := redis.NewCmd(ctx, "ping")
defer span.End()
processHook := hook.ProcessHook(func(ctx context.Context, cmd redis.Cmder) error {
attrs := trace.SpanFromContext(ctx).(sdktrace.ReadOnlySpan).Attributes()
for _, attr := range attrs {
switch attr.Key {
case semconv.CodeFunctionKey,
semconv.CodeFilepathKey,
semconv.CodeLineNumberKey:
t.Fatalf("Attribute with %s statement should not exist", attr.Key)
}
}
return nil
})
err := processHook(ctx, cmd)
if err != nil {
t.Fatal(err)
}
}
func TestTracingHook_DialHook(t *testing.T) {
imsb := tracetest.NewInMemoryExporter()
provider := sdktrace.NewTracerProvider(sdktrace.WithSyncer(imsb))

View File

@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/prometheus/client_golang v1.14.0
github.com/redis/go-redis/v9 v9.10.0
github.com/redis/go-redis/v9 v9.11.0
)
require (

View File

@ -3,6 +3,8 @@ package redis
import (
"context"
"time"
"github.com/redis/go-redis/v9/internal/hashtag"
)
type GenericCmdable interface {
@ -363,6 +365,9 @@ func (c cmdable) Scan(ctx context.Context, cursor uint64, match string, count in
args = append(args, "count", count)
}
cmd := NewScanCmd(ctx, c, args...)
if hashtag.Present(match) {
cmd.SetFirstKeyPos(3)
}
_ = c(ctx, cmd)
return cmd
}
@ -379,6 +384,9 @@ func (c cmdable) ScanType(ctx context.Context, cursor uint64, match string, coun
args = append(args, "type", keyType)
}
cmd := NewScanCmd(ctx, c, args...)
if hashtag.Present(match) {
cmd.SetFirstKeyPos(3)
}
_ = c(ctx, cmd)
return cmd
}

View File

@ -3,6 +3,8 @@ package redis
import (
"context"
"time"
"github.com/redis/go-redis/v9/internal/hashtag"
)
type HashCmdable interface {
@ -192,6 +194,9 @@ func (c cmdable) HScan(ctx context.Context, key string, cursor uint64, match str
args = append(args, "count", count)
}
cmd := NewScanCmd(ctx, c, args...)
if hashtag.Present(match) {
cmd.SetFirstKeyPos(4)
}
_ = c(ctx, cmd)
return cmd
}
@ -211,6 +216,9 @@ func (c cmdable) HScanNoValues(ctx context.Context, key string, cursor uint64, m
}
args = append(args, "novalues")
cmd := NewScanCmd(ctx, c, args...)
if hashtag.Present(match) {
cmd.SetFirstKeyPos(4)
}
_ = c(ctx, cmd)
return cmd
}

View File

@ -56,6 +56,18 @@ func Key(key string) string {
return key
}
func Present(key string) bool {
if key == "" {
return false
}
if s := strings.IndexByte(key, '{'); s > -1 {
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
return true
}
}
return false
}
func RandomSlot() int {
return rand.Intn(slotNumber)
}

View File

@ -69,3 +69,28 @@ var _ = Describe("HashSlot", func() {
}
})
})
var _ = Describe("Present", func() {
It("should calculate hash slots", func() {
tests := []struct {
key string
present bool
}{
{"123456789", false},
{"{}foo", false},
{"foo{}", false},
{"foo{}{bar}", false},
{"", false},
{string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), false},
{"foo{bar}", true},
{"{foo}bar", true},
{"{user1000}.following", true},
{"foo{{bar}}zap", true},
{"foo{bar}{zap}", true},
}
for _, test := range tests {
Expect(Present(test.key)).To(Equal(test.present), "for %s", test.key)
}
})
})

View File

@ -364,15 +364,22 @@ var _ = Describe("ClusterClient", func() {
It("select slot from args for GETKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200)
slot := client.cmdSlot(cmd)
slot := client.cmdSlot(cmd, -1)
Expect(slot).To(Equal(100))
})
It("select slot from args for COUNTKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100)
slot := client.cmdSlot(cmd)
slot := client.cmdSlot(cmd, -1)
Expect(slot).To(Equal(100))
})
It("follows preferred random slot", func() {
cmd := NewStatusCmd(ctx, "ping")
slot := client.cmdSlot(cmd, 101)
Expect(slot).To(Equal(101))
})
})
})

View File

@ -998,7 +998,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
}
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, -1)
var node *clusterNode
var moved bool
var ask bool
@ -1344,9 +1344,13 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
return err
}
preferredRandomSlot := -1
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
node, err := c.slotReadOnlyNode(state, slot)
if err != nil {
return err
@ -1357,7 +1361,10 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
}
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
node, err := state.slotMasterNode(slot)
if err != nil {
return err
@ -1519,58 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
return err
}
cmdsMap := c.mapCmdsBySlot(cmds)
// TxPipeline does not support cross slot transaction.
if len(cmdsMap) > 1 {
keyedCmdsBySlot := c.slottedKeyedCommands(cmds)
slot := -1
switch len(keyedCmdsBySlot) {
case 0:
slot = hashtag.RandomSlot()
case 1:
for sl := range keyedCmdsBySlot {
slot = sl
break
}
default:
// TxPipeline does not support cross slot transaction.
setCmdsErr(cmds, ErrCrossSlot)
return ErrCrossSlot
}
for slot, cmds := range cmdsMap {
node, err := state.slotMasterNode(slot)
if err != nil {
setCmdsErr(cmds, err)
continue
node, err := state.slotMasterNode(slot)
if err != nil {
setCmdsErr(cmds, err)
return err
}
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
setCmdsErr(cmds, err)
return err
}
}
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
setCmdsErr(cmds, err)
return err
}
}
failedCmds := newCmdsMap()
var wg sync.WaitGroup
failedCmds := newCmdsMap()
var wg sync.WaitGroup
for node, cmds := range cmdsMap {
wg.Add(1)
go func(node *clusterNode, cmds []Cmder) {
defer wg.Done()
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
}(node, cmds)
}
wg.Wait()
if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds.m
for node, cmds := range cmdsMap {
wg.Add(1)
go func(node *clusterNode, cmds []Cmder) {
defer wg.Done()
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
}(node, cmds)
}
wg.Wait()
if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds.m
}
return cmdsFirstErr(cmds)
}
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
cmdsMap := make(map[int][]Cmder)
// slottedKeyedCommands returns a map of slot to commands taking into account
// only commands that have keys.
func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder {
cmdsSlots := map[int][]Cmder{}
preferredRandomSlot := -1
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
cmdsMap[slot] = append(cmdsMap[slot], cmd)
if cmdFirstKeyPos(cmd) == 0 {
continue
}
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
cmdsSlots[slot] = append(cmdsSlots[slot], cmd)
}
return cmdsMap
return cmdsSlots
}
func (c *ClusterClient) processTxPipelineNode(
@ -1885,17 +1912,20 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
return info
}
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int {
args := cmd.Args()
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
return args[2].(int)
}
return cmdSlot(cmd, cmdFirstKeyPos(cmd))
return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot)
}
func cmdSlot(cmd Cmder, pos int) int {
func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int {
if pos == 0 {
if preferredRandomSlot != -1 {
return preferredRandomSlot
}
return hashtag.RandomSlot()
}
firstKey := cmd.stringArg(pos)

View File

@ -603,6 +603,15 @@ var _ = Describe("ClusterClient", func() {
Expect(err).To(MatchError(redis.ErrCrossSlot))
})
It("works normally with keyless commands and no CrossSlot error", func() {
pipe.Set(ctx, "A{s}", "A_value", 0)
pipe.Ping(ctx)
pipe.Set(ctx, "B{s}", "B_value", 0)
pipe.Ping(ctx)
_, err := pipe.Exec(ctx)
Expect(err).To(Not(HaveOccurred()))
})
// doesn't fail when no commands are queued
It("returns no error when there are no commands", func() {
_, err := pipe.Exec(ctx)

31
ring.go
View File

@ -13,6 +13,7 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/dgryski/go-rendezvous" //nolint
"github.com/redis/go-redis/v9/auth"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/hashtag"
@ -73,7 +74,24 @@ type RingOptions struct {
Protocol int
Username string
Password string
DB int
// CredentialsProvider allows the username and password to be updated
// before reconnecting. It should return the current username and password.
CredentialsProvider func() (username string, password string)
// CredentialsProviderContext is an enhanced parameter of CredentialsProvider,
// done to maintain API compatibility. In the future,
// there might be a merge between CredentialsProviderContext and CredentialsProvider.
// There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider.
CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
// StreamingCredentialsProvider is used to retrieve the credentials
// for the connection from an external source. Those credentials may change
// during the connection lifetime. This is useful for managed identity
// scenarios where the credentials are retrieved from an external source.
//
// Currently, this is a placeholder for the future implementation.
StreamingCredentialsProvider auth.StreamingCredentialsProvider
DB int
MaxRetries int
MinRetryBackoff time.Duration
@ -154,10 +172,13 @@ func (opt *RingOptions) clientOptions() *Options {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
CredentialsProvider: opt.CredentialsProvider,
CredentialsProviderContext: opt.CredentialsProviderContext,
StreamingCredentialsProvider: opt.StreamingCredentialsProvider,
DB: opt.DB,
MaxRetries: -1,

View File

@ -304,7 +304,7 @@ var _ = Describe("Redis Ring", func() {
ring = redis.NewRing(opt)
})
It("supports Process hook", func() {
err := ring.Ping(ctx).Err()
err := ring.Set(ctx, "key", "test", 0).Err()
Expect(err).NotTo(HaveOccurred())
var stack []string
@ -312,12 +312,12 @@ var _ = Describe("Redis Ring", func() {
ring.AddHook(&hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
Expect(cmd.String()).To(Equal("ping: "))
Expect(cmd.String()).To(Equal("get key: "))
stack = append(stack, "ring.BeforeProcess")
err := hook(ctx, cmd)
Expect(cmd.String()).To(Equal("ping: PONG"))
Expect(cmd.String()).To(Equal("get key: test"))
stack = append(stack, "ring.AfterProcess")
return err
@ -329,12 +329,12 @@ var _ = Describe("Redis Ring", func() {
shard.AddHook(&hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
Expect(cmd.String()).To(Equal("ping: "))
Expect(cmd.String()).To(Equal("get key: "))
stack = append(stack, "shard.BeforeProcess")
err := hook(ctx, cmd)
Expect(cmd.String()).To(Equal("ping: PONG"))
Expect(cmd.String()).To(Equal("get key: test"))
stack = append(stack, "shard.AfterProcess")
return err
@ -344,7 +344,7 @@ var _ = Describe("Redis Ring", func() {
return nil
})
err = ring.Ping(ctx).Err()
err = ring.Get(ctx, "key").Err()
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"ring.BeforeProcess",

View File

@ -12,6 +12,7 @@ import (
"sync"
"time"
"github.com/redis/go-redis/v9/auth"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/rand"
@ -60,7 +61,24 @@ type FailoverOptions struct {
Protocol int
Username string
Password string
DB int
// CredentialsProvider allows the username and password to be updated
// before reconnecting. It should return the current username and password.
CredentialsProvider func() (username string, password string)
// CredentialsProviderContext is an enhanced parameter of CredentialsProvider,
// done to maintain API compatibility. In the future,
// there might be a merge between CredentialsProviderContext and CredentialsProvider.
// There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider.
CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
// StreamingCredentialsProvider is used to retrieve the credentials
// for the connection from an external source. Those credentials may change
// during the connection lifetime. This is useful for managed identity
// scenarios where the credentials are retrieved from an external source.
//
// Currently, this is a placeholder for the future implementation.
StreamingCredentialsProvider auth.StreamingCredentialsProvider
DB int
MaxRetries int
MinRetryBackoff time.Duration
@ -107,10 +125,13 @@ func (opt *FailoverOptions) clientOptions() *Options {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,
DB: opt.DB,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
CredentialsProvider: opt.CredentialsProvider,
CredentialsProviderContext: opt.CredentialsProviderContext,
StreamingCredentialsProvider: opt.StreamingCredentialsProvider,
MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff,
@ -187,9 +208,12 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
CredentialsProvider: opt.CredentialsProvider,
CredentialsProviderContext: opt.CredentialsProviderContext,
StreamingCredentialsProvider: opt.StreamingCredentialsProvider,
MaxRedirects: opt.MaxRetries,

View File

@ -1,6 +1,10 @@
package redis
import "context"
import (
"context"
"github.com/redis/go-redis/v9/internal/hashtag"
)
type SetCmdable interface {
SAdd(ctx context.Context, key string, members ...interface{}) *IntCmd
@ -211,6 +215,9 @@ func (c cmdable) SScan(ctx context.Context, key string, cursor uint64, match str
args = append(args, "count", count)
}
cmd := NewScanCmd(ctx, c, args...)
if hashtag.Present(match) {
cmd.SetFirstKeyPos(4)
}
_ = c(ctx, cmd)
return cmd
}

View File

@ -4,6 +4,8 @@ import (
"context"
"strings"
"time"
"github.com/redis/go-redis/v9/internal/hashtag"
)
type SortedSetCmdable interface {
@ -719,6 +721,9 @@ func (c cmdable) ZScan(ctx context.Context, key string, cursor uint64, match str
args = append(args, "count", count)
}
cmd := NewScanCmd(ctx, c, args...)
if hashtag.Present(match) {
cmd.SetFirstKeyPos(4)
}
_ = c(ctx, cmd)
return cmd
}

View File

@ -5,6 +5,8 @@ import (
"crypto/tls"
"net"
"time"
"github.com/redis/go-redis/v9/auth"
)
// UniversalOptions information is required by UniversalClient to establish
@ -26,9 +28,27 @@ type UniversalOptions struct {
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
OnConnect func(ctx context.Context, cn *Conn) error
Protocol int
Username string
Password string
Protocol int
Username string
Password string
// CredentialsProvider allows the username and password to be updated
// before reconnecting. It should return the current username and password.
CredentialsProvider func() (username string, password string)
// CredentialsProviderContext is an enhanced parameter of CredentialsProvider,
// done to maintain API compatibility. In the future,
// there might be a merge between CredentialsProviderContext and CredentialsProvider.
// There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider.
CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
// StreamingCredentialsProvider is used to retrieve the credentials
// for the connection from an external source. Those credentials may change
// during the connection lifetime. This is useful for managed identity
// scenarios where the credentials are retrieved from an external source.
//
// Currently, this is a placeholder for the future implementation.
StreamingCredentialsProvider auth.StreamingCredentialsProvider
SentinelUsername string
SentinelPassword string
@ -96,9 +116,12 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
Dialer: o.Dialer,
OnConnect: o.OnConnect,
Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,
Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,
CredentialsProvider: o.CredentialsProvider,
CredentialsProviderContext: o.CredentialsProviderContext,
StreamingCredentialsProvider: o.StreamingCredentialsProvider,
MaxRedirects: o.MaxRedirects,
ReadOnly: o.ReadOnly,
@ -147,10 +170,14 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
Dialer: o.Dialer,
OnConnect: o.OnConnect,
DB: o.DB,
Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,
DB: o.DB,
Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,
CredentialsProvider: o.CredentialsProvider,
CredentialsProviderContext: o.CredentialsProviderContext,
StreamingCredentialsProvider: o.StreamingCredentialsProvider,
SentinelUsername: o.SentinelUsername,
SentinelPassword: o.SentinelPassword,
@ -199,10 +226,13 @@ func (o *UniversalOptions) Simple() *Options {
Dialer: o.Dialer,
OnConnect: o.OnConnect,
DB: o.DB,
Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,
DB: o.DB,
Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,
CredentialsProvider: o.CredentialsProvider,
CredentialsProviderContext: o.CredentialsProviderContext,
StreamingCredentialsProvider: o.StreamingCredentialsProvider,
MaxRetries: o.MaxRetries,
MinRetryBackoff: o.MinRetryBackoff,

View File

@ -2,5 +2,5 @@ package redis
// Version is the current release version.
func Version() string {
return "9.10.0"
return "9.11.0"
}