diff --git a/.github/workflows/spellcheck.yml b/.github/workflows/spellcheck.yml index 6ab2c467..81e73cd4 100644 --- a/.github/workflows/spellcheck.yml +++ b/.github/workflows/spellcheck.yml @@ -8,7 +8,7 @@ jobs: - name: Checkout uses: actions/checkout@v4 - name: Check Spelling - uses: rojopolis/spellcheck-github-actions@0.49.0 + uses: rojopolis/spellcheck-github-actions@0.51.0 with: config_path: .github/spellcheck-settings.yml task_name: Markdown diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index f6a4abb9..64754902 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -1,5 +1,43 @@ # Release Notes +# 9.11.0 (2025-06-24) + +## 🚀 Highlights + +Fixes TxPipeline to work correctly in cluster scenarios, allowing execution of commands +only in the same slot. + +# Changes + +## 🚀 New Features + +- Set cluster slot for `scan` commands, rather than random ([#2623](https://github.com/redis/go-redis/pull/2623)) +- Add CredentialsProvider field to UniversalOptions ([#2927](https://github.com/redis/go-redis/pull/2927)) +- feat(redisotel): add WithCallerEnabled option ([#3415](https://github.com/redis/go-redis/pull/3415)) + +## 🐛 Bug Fixes + +- fix(txpipeline): keyless commands should take the slot of the keyed ([#3411](https://github.com/redis/go-redis/pull/3411)) +- fix(loading): cache the loaded flag for slave nodes ([#3410](https://github.com/redis/go-redis/pull/3410)) +- fix(txpipeline): should return error on multi/exec on multiple slots ([#3408](https://github.com/redis/go-redis/pull/3408)) +- fix: check if the shard exists to avoid returning nil ([#3396](https://github.com/redis/go-redis/pull/3396)) + +## 🧰 Maintenance + +- feat: optimize connection pool waitTurn ([#3412](https://github.com/redis/go-redis/pull/3412)) +- chore(ci): update CI redis builds ([#3407](https://github.com/redis/go-redis/pull/3407)) +- chore: remove a redundant method from `Ring`, `Client` and `ClusterClient` ([#3401](https://github.com/redis/go-redis/pull/3401)) +- test: refactor TestBasicCredentials using table-driven tests ([#3406](https://github.com/redis/go-redis/pull/3406)) +- perf: reduce unnecessary memory allocation operations ([#3399](https://github.com/redis/go-redis/pull/3399)) +- fix: insert entry during iterating over a map ([#3398](https://github.com/redis/go-redis/pull/3398)) +- DOC-5229 probabilistic data type examples ([#3413](https://github.com/redis/go-redis/pull/3413)) +- chore(deps): bump rojopolis/spellcheck-github-actions from 0.49.0 to 0.51.0 ([#3414](https://github.com/redis/go-redis/pull/3414)) + +## Contributors +We'd like to thank all the contributors who worked on this release! + +[@andy-stark-redis](https://github.com/andy-stark-redis), [@boekkooi-impossiblecloud](https://github.com/boekkooi-impossiblecloud), [@cxljs](https://github.com/cxljs), [@dcherubini](https://github.com/dcherubini), [@dependabot[bot]](https://github.com/apps/dependabot), [@iamamirsalehi](https://github.com/iamamirsalehi), [@ndyakov](https://github.com/ndyakov), [@pete-woods](https://github.com/pete-woods), [@twz915](https://github.com/twz915) and [dependabot[bot]](https://github.com/apps/dependabot) + # 9.10.0 (2025-06-06) ## 🚀 Highlights diff --git a/command.go b/command.go index 9d9d2848..d3fb231b 100644 --- a/command.go +++ b/command.go @@ -17,6 +17,55 @@ import ( "github.com/redis/go-redis/v9/internal/util" ) +// keylessCommands contains Redis commands that have empty key specifications (9th slot empty) +// Only includes core Redis commands, excludes FT.*, ts.*, timeseries.*, search.* and subcommands +var keylessCommands = map[string]struct{}{ + "acl": {}, + "asking": {}, + "auth": {}, + "bgrewriteaof": {}, + "bgsave": {}, + "client": {}, + "cluster": {}, + "config": {}, + "debug": {}, + "discard": {}, + "echo": {}, + "exec": {}, + "failover": {}, + "function": {}, + "hello": {}, + "latency": {}, + "lolwut": {}, + "module": {}, + "monitor": {}, + "multi": {}, + "pfselftest": {}, + "ping": {}, + "psubscribe": {}, + "psync": {}, + "publish": {}, + "pubsub": {}, + "punsubscribe": {}, + "quit": {}, + "readonly": {}, + "readwrite": {}, + "replconf": {}, + "replicaof": {}, + "role": {}, + "save": {}, + "script": {}, + "select": {}, + "shutdown": {}, + "slaveof": {}, + "slowlog": {}, + "subscribe": {}, + "swapdb": {}, + "sync": {}, + "unsubscribe": {}, + "unwatch": {}, +} + type Cmder interface { // command name. // e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster". @@ -75,12 +124,22 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error { return wr.WriteArgs(cmd.Args()) } +// cmdFirstKeyPos returns the position of the first key in the command's arguments. +// If the command does not have a key, it returns 0. +// TODO: Use the data in CommandInfo to determine the first key position. func cmdFirstKeyPos(cmd Cmder) int { if pos := cmd.firstKeyPos(); pos != 0 { return int(pos) } - switch cmd.Name() { + name := cmd.Name() + + // first check if the command is keyless + if _, ok := keylessCommands[name]; ok { + return 0 + } + + switch name { case "eval", "evalsha", "eval_ro", "evalsha_ro": if cmd.stringArg(2) != "0" { return 3 diff --git a/doctests/home_prob_dts_test.go b/doctests/home_prob_dts_test.go new file mode 100644 index 00000000..dcbbfcfb --- /dev/null +++ b/doctests/home_prob_dts_test.go @@ -0,0 +1,412 @@ +// EXAMPLE: home_prob_dts +// HIDE_START +package example_commands_test + +import ( + "context" + "fmt" + + "github.com/redis/go-redis/v9" +) + +// HIDE_END + +func ExampleClient_probabilistic_datatypes() { + ctx := context.Background() + + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", // no password set + DB: 0, // use default DB + }) + + // REMOVE_START + rdb.FlushDB(ctx) + rdb.Del(ctx, + "recorded_users", "other_users", + "group:1", "group:2", "both_groups", + "items_sold", + "male_heights", "female_heights", "all_heights", + "top_3_songs") + // REMOVE_END + + // STEP_START bloom + res1, err := rdb.BFMAdd( + ctx, + "recorded_users", + "andy", "cameron", "david", "michelle", + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res1) // >>> [true true true true] + + res2, err := rdb.BFExists(ctx, + "recorded_users", "cameron", + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res2) // >>> true + + res3, err := rdb.BFExists(ctx, "recorded_users", "kaitlyn").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res3) // >>> false + // STEP_END + + // STEP_START cuckoo + res4, err := rdb.CFAdd(ctx, "other_users", "paolo").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res4) // >>> true + + res5, err := rdb.CFAdd(ctx, "other_users", "kaitlyn").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res5) // >>> true + + res6, err := rdb.CFAdd(ctx, "other_users", "rachel").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res6) // >>> true + + res7, err := rdb.CFMExists(ctx, + "other_users", "paolo", "rachel", "andy", + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res7) // >>> [true true false] + + res8, err := rdb.CFDel(ctx, "other_users", "paolo").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res8) // >>> true + + res9, err := rdb.CFExists(ctx, "other_users", "paolo").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res9) // >>> false + // STEP_END + + // STEP_START hyperloglog + res10, err := rdb.PFAdd( + ctx, + "group:1", + "andy", "cameron", "david", + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res10) // >>> 1 + + res11, err := rdb.PFCount(ctx, "group:1").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res11) // >>> 3 + + res12, err := rdb.PFAdd(ctx, + "group:2", + "kaitlyn", "michelle", "paolo", "rachel", + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res12) // >>> 1 + + res13, err := rdb.PFCount(ctx, "group:2").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res13) // >>> 4 + + res14, err := rdb.PFMerge( + ctx, + "both_groups", + "group:1", "group:2", + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res14) // >>> OK + + res15, err := rdb.PFCount(ctx, "both_groups").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res15) // >>> 7 + // STEP_END + + // STEP_START cms + // Specify that you want to keep the counts within 0.01 + // (0.1%) of the true value with a 0.005 (0.05%) chance + // of going outside this limit. + res16, err := rdb.CMSInitByProb(ctx, "items_sold", 0.01, 0.005).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res16) // >>> OK + + // The parameters for `CMSIncrBy()` are two lists. The count + // for each item in the first list is incremented by the + // value at the same index in the second list. + res17, err := rdb.CMSIncrBy(ctx, "items_sold", + "bread", 300, + "tea", 200, + "coffee", 200, + "beer", 100, + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res17) // >>> [300 200 200 100] + + res18, err := rdb.CMSIncrBy(ctx, "items_sold", + "bread", 100, + "coffee", 150, + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res18) // >>> [400 350] + + res19, err := rdb.CMSQuery(ctx, + "items_sold", + "bread", "tea", "coffee", "beer", + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res19) // >>> [400 200 350 100] + // STEP_END + + // STEP_START tdigest + res20, err := rdb.TDigestCreate(ctx, "male_heights").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res20) // >>> OK + + res21, err := rdb.TDigestAdd(ctx, "male_heights", + 175.5, 181, 160.8, 152, 177, 196, 164, + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res21) // >>> OK + + res22, err := rdb.TDigestMin(ctx, "male_heights").Result() + if err != nil { + panic(err) + } + fmt.Println(res22) // >>> 152 + + res23, err := rdb.TDigestMax(ctx, "male_heights").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res23) // >>> 196 + + res24, err := rdb.TDigestQuantile(ctx, "male_heights", 0.75).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res24) // >>> [181] + + // Note that the CDF value for 181 is not exactly + // 0.75. Both values are estimates. + res25, err := rdb.TDigestCDF(ctx, "male_heights", 181).Result() + + if err != nil { + panic(err) + } + + fmt.Printf("%.4f\n", res25[0]) // >>> 0.7857 + + res26, err := rdb.TDigestCreate(ctx, "female_heights").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res26) // >>> OK + + res27, err := rdb.TDigestAdd(ctx, "female_heights", + 155.5, 161, 168.5, 170, 157.5, 163, 171, + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res27) // >>> OK + + res28, err := rdb.TDigestQuantile(ctx, "female_heights", 0.75).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res28) // >>> [170] + + res29, err := rdb.TDigestMerge(ctx, "all_heights", + nil, + "male_heights", "female_heights", + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res29) // >>> OK + + res30, err := rdb.TDigestQuantile(ctx, "all_heights", 0.75).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res30) // >>> [175.5] + // STEP_END + + // STEP_START topk + // Create a TopK filter that keeps track of the top 3 items + res31, err := rdb.TopKReserve(ctx, "top_3_songs", 3).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res31) // >>> OK + + // Add some items to the filter + res32, err := rdb.TopKIncrBy(ctx, + "top_3_songs", + "Starfish Trooper", 3000, + "Only one more time", 1850, + "Rock me, Handel", 1325, + "How will anyone know?", 3890, + "Average lover", 4098, + "Road to everywhere", 770, + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res32) + // >>> [ Rock me, Handel Only one more time ] + + res33, err := rdb.TopKList(ctx, "top_3_songs").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res33) + // >>> [Average lover How will anyone know? Starfish Trooper] + + // Query the count for specific items + res34, err := rdb.TopKQuery( + ctx, + "top_3_songs", + "Starfish Trooper", "Road to everywhere", + ).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res34) // >>> [true false] + // STEP_END + + // Output: + // [true true true true] + // true + // false + // true + // true + // true + // [true true false] + // true + // false + // 1 + // 3 + // 1 + // 4 + // OK + // 7 + // OK + // [300 200 200 100] + // [400 350] + // [400 200 350 100] + // OK + // OK + // 152 + // 196 + // [181] + // 0.7857 + // OK + // OK + // [170] + // OK + // [175.5] + // OK + // [ Rock me, Handel Only one more time ] + // [Average lover How will anyone know? Starfish Trooper] + // [true false] +} diff --git a/example/del-keys-without-ttl/go.mod b/example/del-keys-without-ttl/go.mod index eac5651a..08144430 100644 --- a/example/del-keys-without-ttl/go.mod +++ b/example/del-keys-without-ttl/go.mod @@ -5,7 +5,7 @@ go 1.18 replace github.com/redis/go-redis/v9 => ../.. require ( - github.com/redis/go-redis/v9 v9.10.0 + github.com/redis/go-redis/v9 v9.11.0 go.uber.org/zap v1.24.0 ) diff --git a/example/hll/go.mod b/example/hll/go.mod index b0769c48..19611d46 100644 --- a/example/hll/go.mod +++ b/example/hll/go.mod @@ -4,7 +4,7 @@ go 1.18 replace github.com/redis/go-redis/v9 => ../.. -require github.com/redis/go-redis/v9 v9.10.0 +require github.com/redis/go-redis/v9 v9.11.0 require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/example/hset-struct/go.mod b/example/hset-struct/go.mod index ee6a219d..89293593 100644 --- a/example/hset-struct/go.mod +++ b/example/hset-struct/go.mod @@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../.. require ( github.com/davecgh/go-spew v1.1.1 - github.com/redis/go-redis/v9 v9.10.0 + github.com/redis/go-redis/v9 v9.11.0 ) require ( diff --git a/example/lua-scripting/go.mod b/example/lua-scripting/go.mod index e501e03e..1706c42e 100644 --- a/example/lua-scripting/go.mod +++ b/example/lua-scripting/go.mod @@ -4,7 +4,7 @@ go 1.18 replace github.com/redis/go-redis/v9 => ../.. -require github.com/redis/go-redis/v9 v9.10.0 +require github.com/redis/go-redis/v9 v9.11.0 require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/example/otel/go.mod b/example/otel/go.mod index 97fc824e..26653fbc 100644 --- a/example/otel/go.mod +++ b/example/otel/go.mod @@ -11,8 +11,8 @@ replace github.com/redis/go-redis/extra/redisotel/v9 => ../../extra/redisotel replace github.com/redis/go-redis/extra/rediscmd/v9 => ../../extra/rediscmd require ( - github.com/redis/go-redis/extra/redisotel/v9 v9.10.0 - github.com/redis/go-redis/v9 v9.10.0 + github.com/redis/go-redis/extra/redisotel/v9 v9.11.0 + github.com/redis/go-redis/v9 v9.11.0 github.com/uptrace/uptrace-go v1.21.0 go.opentelemetry.io/otel v1.22.0 ) @@ -25,7 +25,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect - github.com/redis/go-redis/extra/rediscmd/v9 v9.10.0 // indirect + github.com/redis/go-redis/extra/rediscmd/v9 v9.11.0 // indirect go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect diff --git a/example/redis-bloom/go.mod b/example/redis-bloom/go.mod index 48fdc4e4..6eb04204 100644 --- a/example/redis-bloom/go.mod +++ b/example/redis-bloom/go.mod @@ -4,7 +4,7 @@ go 1.18 replace github.com/redis/go-redis/v9 => ../.. -require github.com/redis/go-redis/v9 v9.10.0 +require github.com/redis/go-redis/v9 v9.11.0 require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/example/scan-struct/go.mod b/example/scan-struct/go.mod index ee6a219d..89293593 100644 --- a/example/scan-struct/go.mod +++ b/example/scan-struct/go.mod @@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../.. require ( github.com/davecgh/go-spew v1.1.1 - github.com/redis/go-redis/v9 v9.10.0 + github.com/redis/go-redis/v9 v9.11.0 ) require ( diff --git a/extra/rediscensus/go.mod b/extra/rediscensus/go.mod index c06d9808..5e01aba6 100644 --- a/extra/rediscensus/go.mod +++ b/extra/rediscensus/go.mod @@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../.. replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd require ( - github.com/redis/go-redis/extra/rediscmd/v9 v9.10.0 - github.com/redis/go-redis/v9 v9.10.0 + github.com/redis/go-redis/extra/rediscmd/v9 v9.11.0 + github.com/redis/go-redis/v9 v9.11.0 go.opencensus.io v0.24.0 ) diff --git a/extra/rediscmd/go.mod b/extra/rediscmd/go.mod index b86582fc..c8e8f3c2 100644 --- a/extra/rediscmd/go.mod +++ b/extra/rediscmd/go.mod @@ -7,7 +7,7 @@ replace github.com/redis/go-redis/v9 => ../.. require ( github.com/bsm/ginkgo/v2 v2.12.0 github.com/bsm/gomega v1.27.10 - github.com/redis/go-redis/v9 v9.10.0 + github.com/redis/go-redis/v9 v9.11.0 ) require ( diff --git a/extra/redisotel/config.go b/extra/redisotel/config.go index c02ee0b3..6ebd4bd5 100644 --- a/extra/redisotel/config.go +++ b/extra/redisotel/config.go @@ -20,6 +20,7 @@ type config struct { tracer trace.Tracer dbStmtEnabled bool + callerEnabled bool // Metrics options. @@ -57,6 +58,7 @@ func newConfig(opts ...baseOption) *config { tp: otel.GetTracerProvider(), mp: otel.GetMeterProvider(), dbStmtEnabled: true, + callerEnabled: true, } for _, opt := range opts { @@ -106,13 +108,20 @@ func WithTracerProvider(provider trace.TracerProvider) TracingOption { }) } -// WithDBStatement tells the tracing hook not to log raw redis commands. +// WithDBStatement tells the tracing hook to log raw redis commands. func WithDBStatement(on bool) TracingOption { return tracingOption(func(conf *config) { conf.dbStmtEnabled = on }) } +// WithCallerEnabled tells the tracing hook to log the calling function, file and line. +func WithCallerEnabled(on bool) TracingOption { + return tracingOption(func(conf *config) { + conf.callerEnabled = on + }) +} + //------------------------------------------------------------------------------ type MetricsOption interface { diff --git a/extra/redisotel/go.mod b/extra/redisotel/go.mod index 1e415da6..b3c2db5f 100644 --- a/extra/redisotel/go.mod +++ b/extra/redisotel/go.mod @@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../.. replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd require ( - github.com/redis/go-redis/extra/rediscmd/v9 v9.10.0 - github.com/redis/go-redis/v9 v9.10.0 + github.com/redis/go-redis/extra/rediscmd/v9 v9.11.0 + github.com/redis/go-redis/v9 v9.11.0 go.opentelemetry.io/otel v1.22.0 go.opentelemetry.io/otel/metric v1.22.0 go.opentelemetry.io/otel/sdk v1.22.0 diff --git a/extra/redisotel/tracing.go b/extra/redisotel/tracing.go index 33b7abac..40df5a20 100644 --- a/extra/redisotel/tracing.go +++ b/extra/redisotel/tracing.go @@ -101,14 +101,16 @@ func (th *tracingHook) DialHook(hook redis.DialHook) redis.DialHook { func (th *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook { return func(ctx context.Context, cmd redis.Cmder) error { - fn, file, line := funcFileLine("github.com/redis/go-redis") attrs := make([]attribute.KeyValue, 0, 8) - attrs = append(attrs, - semconv.CodeFunction(fn), - semconv.CodeFilepath(file), - semconv.CodeLineNumber(line), - ) + if th.conf.callerEnabled { + fn, file, line := funcFileLine("github.com/redis/go-redis") + attrs = append(attrs, + semconv.CodeFunction(fn), + semconv.CodeFilepath(file), + semconv.CodeLineNumber(line), + ) + } if th.conf.dbStmtEnabled { cmdString := rediscmd.CmdString(cmd) @@ -133,16 +135,20 @@ func (th *tracingHook) ProcessPipelineHook( hook redis.ProcessPipelineHook, ) redis.ProcessPipelineHook { return func(ctx context.Context, cmds []redis.Cmder) error { - fn, file, line := funcFileLine("github.com/redis/go-redis") - attrs := make([]attribute.KeyValue, 0, 8) attrs = append(attrs, - semconv.CodeFunction(fn), - semconv.CodeFilepath(file), - semconv.CodeLineNumber(line), attribute.Int("db.redis.num_cmd", len(cmds)), ) + if th.conf.callerEnabled { + fn, file, line := funcFileLine("github.com/redis/go-redis") + attrs = append(attrs, + semconv.CodeFunction(fn), + semconv.CodeFilepath(file), + semconv.CodeLineNumber(line), + ) + } + summary, cmdsString := rediscmd.CmdsString(cmds) if th.conf.dbStmtEnabled { attrs = append(attrs, semconv.DBStatement(cmdsString)) diff --git a/extra/redisotel/tracing_test.go b/extra/redisotel/tracing_test.go index e5ef86ed..a3e3ccc6 100644 --- a/extra/redisotel/tracing_test.go +++ b/extra/redisotel/tracing_test.go @@ -66,6 +66,35 @@ func TestWithDBStatement(t *testing.T) { } } +func TestWithoutCaller(t *testing.T) { + provider := sdktrace.NewTracerProvider() + hook := newTracingHook( + "", + WithTracerProvider(provider), + WithCallerEnabled(false), + ) + ctx, span := provider.Tracer("redis-test").Start(context.TODO(), "redis-test") + cmd := redis.NewCmd(ctx, "ping") + defer span.End() + + processHook := hook.ProcessHook(func(ctx context.Context, cmd redis.Cmder) error { + attrs := trace.SpanFromContext(ctx).(sdktrace.ReadOnlySpan).Attributes() + for _, attr := range attrs { + switch attr.Key { + case semconv.CodeFunctionKey, + semconv.CodeFilepathKey, + semconv.CodeLineNumberKey: + t.Fatalf("Attribute with %s statement should not exist", attr.Key) + } + } + return nil + }) + err := processHook(ctx, cmd) + if err != nil { + t.Fatal(err) + } +} + func TestTracingHook_DialHook(t *testing.T) { imsb := tracetest.NewInMemoryExporter() provider := sdktrace.NewTracerProvider(sdktrace.WithSyncer(imsb)) diff --git a/extra/redisprometheus/go.mod b/extra/redisprometheus/go.mod index e1b40f96..74613deb 100644 --- a/extra/redisprometheus/go.mod +++ b/extra/redisprometheus/go.mod @@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../.. require ( github.com/prometheus/client_golang v1.14.0 - github.com/redis/go-redis/v9 v9.10.0 + github.com/redis/go-redis/v9 v9.11.0 ) require ( diff --git a/generic_commands.go b/generic_commands.go index dc6c3fe0..c7100222 100644 --- a/generic_commands.go +++ b/generic_commands.go @@ -3,6 +3,8 @@ package redis import ( "context" "time" + + "github.com/redis/go-redis/v9/internal/hashtag" ) type GenericCmdable interface { @@ -363,6 +365,9 @@ func (c cmdable) Scan(ctx context.Context, cursor uint64, match string, count in args = append(args, "count", count) } cmd := NewScanCmd(ctx, c, args...) + if hashtag.Present(match) { + cmd.SetFirstKeyPos(3) + } _ = c(ctx, cmd) return cmd } @@ -379,6 +384,9 @@ func (c cmdable) ScanType(ctx context.Context, cursor uint64, match string, coun args = append(args, "type", keyType) } cmd := NewScanCmd(ctx, c, args...) + if hashtag.Present(match) { + cmd.SetFirstKeyPos(3) + } _ = c(ctx, cmd) return cmd } diff --git a/hash_commands.go b/hash_commands.go index 98a361b3..335cb950 100644 --- a/hash_commands.go +++ b/hash_commands.go @@ -3,6 +3,8 @@ package redis import ( "context" "time" + + "github.com/redis/go-redis/v9/internal/hashtag" ) type HashCmdable interface { @@ -192,6 +194,9 @@ func (c cmdable) HScan(ctx context.Context, key string, cursor uint64, match str args = append(args, "count", count) } cmd := NewScanCmd(ctx, c, args...) + if hashtag.Present(match) { + cmd.SetFirstKeyPos(4) + } _ = c(ctx, cmd) return cmd } @@ -211,6 +216,9 @@ func (c cmdable) HScanNoValues(ctx context.Context, key string, cursor uint64, m } args = append(args, "novalues") cmd := NewScanCmd(ctx, c, args...) + if hashtag.Present(match) { + cmd.SetFirstKeyPos(4) + } _ = c(ctx, cmd) return cmd } diff --git a/internal/hashtag/hashtag.go b/internal/hashtag/hashtag.go index f13ee816..ea56fd6c 100644 --- a/internal/hashtag/hashtag.go +++ b/internal/hashtag/hashtag.go @@ -56,6 +56,18 @@ func Key(key string) string { return key } +func Present(key string) bool { + if key == "" { + return false + } + if s := strings.IndexByte(key, '{'); s > -1 { + if e := strings.IndexByte(key[s+1:], '}'); e > 0 { + return true + } + } + return false +} + func RandomSlot() int { return rand.Intn(slotNumber) } diff --git a/internal/hashtag/hashtag_test.go b/internal/hashtag/hashtag_test.go index fe4865b7..983e3928 100644 --- a/internal/hashtag/hashtag_test.go +++ b/internal/hashtag/hashtag_test.go @@ -69,3 +69,28 @@ var _ = Describe("HashSlot", func() { } }) }) + +var _ = Describe("Present", func() { + It("should calculate hash slots", func() { + tests := []struct { + key string + present bool + }{ + {"123456789", false}, + {"{}foo", false}, + {"foo{}", false}, + {"foo{}{bar}", false}, + {"", false}, + {string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), false}, + {"foo{bar}", true}, + {"{foo}bar", true}, + {"{user1000}.following", true}, + {"foo{{bar}}zap", true}, + {"foo{bar}{zap}", true}, + } + + for _, test := range tests { + Expect(Present(test.key)).To(Equal(test.present), "for %s", test.key) + } + }) +}) diff --git a/internal_test.go b/internal_test.go index 8f1f1f31..4a655cff 100644 --- a/internal_test.go +++ b/internal_test.go @@ -364,15 +364,22 @@ var _ = Describe("ClusterClient", func() { It("select slot from args for GETKEYSINSLOT command", func() { cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200) - slot := client.cmdSlot(cmd) + slot := client.cmdSlot(cmd, -1) Expect(slot).To(Equal(100)) }) It("select slot from args for COUNTKEYSINSLOT command", func() { cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100) - slot := client.cmdSlot(cmd) + slot := client.cmdSlot(cmd, -1) Expect(slot).To(Equal(100)) }) + + It("follows preferred random slot", func() { + cmd := NewStatusCmd(ctx, "ping") + + slot := client.cmdSlot(cmd, 101) + Expect(slot).To(Equal(101)) + }) }) }) diff --git a/osscluster.go b/osscluster.go index 55017d8b..0526022b 100644 --- a/osscluster.go +++ b/osscluster.go @@ -998,7 +998,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error { } func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, -1) var node *clusterNode var moved bool var ask bool @@ -1344,9 +1344,13 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd return err } + preferredRandomSlot := -1 if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) { for _, cmd := range cmds { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } node, err := c.slotReadOnlyNode(state, slot) if err != nil { return err @@ -1357,7 +1361,10 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd } for _, cmd := range cmds { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } node, err := state.slotMasterNode(slot) if err != nil { return err @@ -1519,58 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err return err } - cmdsMap := c.mapCmdsBySlot(cmds) - // TxPipeline does not support cross slot transaction. - if len(cmdsMap) > 1 { + keyedCmdsBySlot := c.slottedKeyedCommands(cmds) + slot := -1 + switch len(keyedCmdsBySlot) { + case 0: + slot = hashtag.RandomSlot() + case 1: + for sl := range keyedCmdsBySlot { + slot = sl + break + } + default: + // TxPipeline does not support cross slot transaction. setCmdsErr(cmds, ErrCrossSlot) return ErrCrossSlot } - for slot, cmds := range cmdsMap { - node, err := state.slotMasterNode(slot) - if err != nil { - setCmdsErr(cmds, err) - continue + node, err := state.slotMasterNode(slot) + if err != nil { + setCmdsErr(cmds, err) + return err + } + + cmdsMap := map[*clusterNode][]Cmder{node: cmds} + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + setCmdsErr(cmds, err) + return err + } } - cmdsMap := map[*clusterNode][]Cmder{node: cmds} - for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { - if attempt > 0 { - if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - setCmdsErr(cmds, err) - return err - } - } + failedCmds := newCmdsMap() + var wg sync.WaitGroup - failedCmds := newCmdsMap() - var wg sync.WaitGroup - - for node, cmds := range cmdsMap { - wg.Add(1) - go func(node *clusterNode, cmds []Cmder) { - defer wg.Done() - c.processTxPipelineNode(ctx, node, cmds, failedCmds) - }(node, cmds) - } - - wg.Wait() - if len(failedCmds.m) == 0 { - break - } - cmdsMap = failedCmds.m + for node, cmds := range cmdsMap { + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + c.processTxPipelineNode(ctx, node, cmds, failedCmds) + }(node, cmds) } + + wg.Wait() + if len(failedCmds.m) == 0 { + break + } + cmdsMap = failedCmds.m } return cmdsFirstErr(cmds) } -func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { - cmdsMap := make(map[int][]Cmder) +// slottedKeyedCommands returns a map of slot to commands taking into account +// only commands that have keys. +func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder { + cmdsSlots := map[int][]Cmder{} + + preferredRandomSlot := -1 for _, cmd := range cmds { - slot := c.cmdSlot(cmd) - cmdsMap[slot] = append(cmdsMap[slot], cmd) + if cmdFirstKeyPos(cmd) == 0 { + continue + } + + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } + + cmdsSlots[slot] = append(cmdsSlots[slot], cmd) } - return cmdsMap + + return cmdsSlots } func (c *ClusterClient) processTxPipelineNode( @@ -1885,17 +1912,20 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo { return info } -func (c *ClusterClient) cmdSlot(cmd Cmder) int { +func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int { args := cmd.Args() if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") { return args[2].(int) } - return cmdSlot(cmd, cmdFirstKeyPos(cmd)) + return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot) } -func cmdSlot(cmd Cmder, pos int) int { +func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int { if pos == 0 { + if preferredRandomSlot != -1 { + return preferredRandomSlot + } return hashtag.RandomSlot() } firstKey := cmd.stringArg(pos) diff --git a/osscluster_test.go b/osscluster_test.go index 10023218..2c7f40a5 100644 --- a/osscluster_test.go +++ b/osscluster_test.go @@ -603,6 +603,15 @@ var _ = Describe("ClusterClient", func() { Expect(err).To(MatchError(redis.ErrCrossSlot)) }) + It("works normally with keyless commands and no CrossSlot error", func() { + pipe.Set(ctx, "A{s}", "A_value", 0) + pipe.Ping(ctx) + pipe.Set(ctx, "B{s}", "B_value", 0) + pipe.Ping(ctx) + _, err := pipe.Exec(ctx) + Expect(err).To(Not(HaveOccurred())) + }) + // doesn't fail when no commands are queued It("returns no error when there are no commands", func() { _, err := pipe.Exec(ctx) diff --git a/ring.go b/ring.go index 1f75913a..ba4f94ee 100644 --- a/ring.go +++ b/ring.go @@ -13,6 +13,7 @@ import ( "github.com/cespare/xxhash/v2" "github.com/dgryski/go-rendezvous" //nolint + "github.com/redis/go-redis/v9/auth" "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/hashtag" @@ -73,7 +74,24 @@ type RingOptions struct { Protocol int Username string Password string - DB int + // CredentialsProvider allows the username and password to be updated + // before reconnecting. It should return the current username and password. + CredentialsProvider func() (username string, password string) + + // CredentialsProviderContext is an enhanced parameter of CredentialsProvider, + // done to maintain API compatibility. In the future, + // there might be a merge between CredentialsProviderContext and CredentialsProvider. + // There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider. + CredentialsProviderContext func(ctx context.Context) (username string, password string, err error) + + // StreamingCredentialsProvider is used to retrieve the credentials + // for the connection from an external source. Those credentials may change + // during the connection lifetime. This is useful for managed identity + // scenarios where the credentials are retrieved from an external source. + // + // Currently, this is a placeholder for the future implementation. + StreamingCredentialsProvider auth.StreamingCredentialsProvider + DB int MaxRetries int MinRetryBackoff time.Duration @@ -154,10 +172,13 @@ func (opt *RingOptions) clientOptions() *Options { Dialer: opt.Dialer, OnConnect: opt.OnConnect, - Protocol: opt.Protocol, - Username: opt.Username, - Password: opt.Password, - DB: opt.DB, + Protocol: opt.Protocol, + Username: opt.Username, + Password: opt.Password, + CredentialsProvider: opt.CredentialsProvider, + CredentialsProviderContext: opt.CredentialsProviderContext, + StreamingCredentialsProvider: opt.StreamingCredentialsProvider, + DB: opt.DB, MaxRetries: -1, diff --git a/ring_test.go b/ring_test.go index ef95e980..5fd7d982 100644 --- a/ring_test.go +++ b/ring_test.go @@ -304,7 +304,7 @@ var _ = Describe("Redis Ring", func() { ring = redis.NewRing(opt) }) It("supports Process hook", func() { - err := ring.Ping(ctx).Err() + err := ring.Set(ctx, "key", "test", 0).Err() Expect(err).NotTo(HaveOccurred()) var stack []string @@ -312,12 +312,12 @@ var _ = Describe("Redis Ring", func() { ring.AddHook(&hook{ processHook: func(hook redis.ProcessHook) redis.ProcessHook { return func(ctx context.Context, cmd redis.Cmder) error { - Expect(cmd.String()).To(Equal("ping: ")) + Expect(cmd.String()).To(Equal("get key: ")) stack = append(stack, "ring.BeforeProcess") err := hook(ctx, cmd) - Expect(cmd.String()).To(Equal("ping: PONG")) + Expect(cmd.String()).To(Equal("get key: test")) stack = append(stack, "ring.AfterProcess") return err @@ -329,12 +329,12 @@ var _ = Describe("Redis Ring", func() { shard.AddHook(&hook{ processHook: func(hook redis.ProcessHook) redis.ProcessHook { return func(ctx context.Context, cmd redis.Cmder) error { - Expect(cmd.String()).To(Equal("ping: ")) + Expect(cmd.String()).To(Equal("get key: ")) stack = append(stack, "shard.BeforeProcess") err := hook(ctx, cmd) - Expect(cmd.String()).To(Equal("ping: PONG")) + Expect(cmd.String()).To(Equal("get key: test")) stack = append(stack, "shard.AfterProcess") return err @@ -344,7 +344,7 @@ var _ = Describe("Redis Ring", func() { return nil }) - err = ring.Ping(ctx).Err() + err = ring.Get(ctx, "key").Err() Expect(err).NotTo(HaveOccurred()) Expect(stack).To(Equal([]string{ "ring.BeforeProcess", diff --git a/sentinel.go b/sentinel.go index 43fbcd24..04c0f726 100644 --- a/sentinel.go +++ b/sentinel.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/redis/go-redis/v9/auth" "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/rand" @@ -60,7 +61,24 @@ type FailoverOptions struct { Protocol int Username string Password string - DB int + // CredentialsProvider allows the username and password to be updated + // before reconnecting. It should return the current username and password. + CredentialsProvider func() (username string, password string) + + // CredentialsProviderContext is an enhanced parameter of CredentialsProvider, + // done to maintain API compatibility. In the future, + // there might be a merge between CredentialsProviderContext and CredentialsProvider. + // There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider. + CredentialsProviderContext func(ctx context.Context) (username string, password string, err error) + + // StreamingCredentialsProvider is used to retrieve the credentials + // for the connection from an external source. Those credentials may change + // during the connection lifetime. This is useful for managed identity + // scenarios where the credentials are retrieved from an external source. + // + // Currently, this is a placeholder for the future implementation. + StreamingCredentialsProvider auth.StreamingCredentialsProvider + DB int MaxRetries int MinRetryBackoff time.Duration @@ -107,10 +125,13 @@ func (opt *FailoverOptions) clientOptions() *Options { Dialer: opt.Dialer, OnConnect: opt.OnConnect, - DB: opt.DB, - Protocol: opt.Protocol, - Username: opt.Username, - Password: opt.Password, + DB: opt.DB, + Protocol: opt.Protocol, + Username: opt.Username, + Password: opt.Password, + CredentialsProvider: opt.CredentialsProvider, + CredentialsProviderContext: opt.CredentialsProviderContext, + StreamingCredentialsProvider: opt.StreamingCredentialsProvider, MaxRetries: opt.MaxRetries, MinRetryBackoff: opt.MinRetryBackoff, @@ -187,9 +208,12 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions { Dialer: opt.Dialer, OnConnect: opt.OnConnect, - Protocol: opt.Protocol, - Username: opt.Username, - Password: opt.Password, + Protocol: opt.Protocol, + Username: opt.Username, + Password: opt.Password, + CredentialsProvider: opt.CredentialsProvider, + CredentialsProviderContext: opt.CredentialsProviderContext, + StreamingCredentialsProvider: opt.StreamingCredentialsProvider, MaxRedirects: opt.MaxRetries, diff --git a/set_commands.go b/set_commands.go index 355f514a..79efa6e4 100644 --- a/set_commands.go +++ b/set_commands.go @@ -1,6 +1,10 @@ package redis -import "context" +import ( + "context" + + "github.com/redis/go-redis/v9/internal/hashtag" +) type SetCmdable interface { SAdd(ctx context.Context, key string, members ...interface{}) *IntCmd @@ -211,6 +215,9 @@ func (c cmdable) SScan(ctx context.Context, key string, cursor uint64, match str args = append(args, "count", count) } cmd := NewScanCmd(ctx, c, args...) + if hashtag.Present(match) { + cmd.SetFirstKeyPos(4) + } _ = c(ctx, cmd) return cmd } diff --git a/sortedset_commands.go b/sortedset_commands.go index 14b35858..e48e7367 100644 --- a/sortedset_commands.go +++ b/sortedset_commands.go @@ -4,6 +4,8 @@ import ( "context" "strings" "time" + + "github.com/redis/go-redis/v9/internal/hashtag" ) type SortedSetCmdable interface { @@ -719,6 +721,9 @@ func (c cmdable) ZScan(ctx context.Context, key string, cursor uint64, match str args = append(args, "count", count) } cmd := NewScanCmd(ctx, c, args...) + if hashtag.Present(match) { + cmd.SetFirstKeyPos(4) + } _ = c(ctx, cmd) return cmd } diff --git a/universal.go b/universal.go index a1ce17ba..9d51b928 100644 --- a/universal.go +++ b/universal.go @@ -5,6 +5,8 @@ import ( "crypto/tls" "net" "time" + + "github.com/redis/go-redis/v9/auth" ) // UniversalOptions information is required by UniversalClient to establish @@ -26,9 +28,27 @@ type UniversalOptions struct { Dialer func(ctx context.Context, network, addr string) (net.Conn, error) OnConnect func(ctx context.Context, cn *Conn) error - Protocol int - Username string - Password string + Protocol int + Username string + Password string + // CredentialsProvider allows the username and password to be updated + // before reconnecting. It should return the current username and password. + CredentialsProvider func() (username string, password string) + + // CredentialsProviderContext is an enhanced parameter of CredentialsProvider, + // done to maintain API compatibility. In the future, + // there might be a merge between CredentialsProviderContext and CredentialsProvider. + // There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider. + CredentialsProviderContext func(ctx context.Context) (username string, password string, err error) + + // StreamingCredentialsProvider is used to retrieve the credentials + // for the connection from an external source. Those credentials may change + // during the connection lifetime. This is useful for managed identity + // scenarios where the credentials are retrieved from an external source. + // + // Currently, this is a placeholder for the future implementation. + StreamingCredentialsProvider auth.StreamingCredentialsProvider + SentinelUsername string SentinelPassword string @@ -96,9 +116,12 @@ func (o *UniversalOptions) Cluster() *ClusterOptions { Dialer: o.Dialer, OnConnect: o.OnConnect, - Protocol: o.Protocol, - Username: o.Username, - Password: o.Password, + Protocol: o.Protocol, + Username: o.Username, + Password: o.Password, + CredentialsProvider: o.CredentialsProvider, + CredentialsProviderContext: o.CredentialsProviderContext, + StreamingCredentialsProvider: o.StreamingCredentialsProvider, MaxRedirects: o.MaxRedirects, ReadOnly: o.ReadOnly, @@ -147,10 +170,14 @@ func (o *UniversalOptions) Failover() *FailoverOptions { Dialer: o.Dialer, OnConnect: o.OnConnect, - DB: o.DB, - Protocol: o.Protocol, - Username: o.Username, - Password: o.Password, + DB: o.DB, + Protocol: o.Protocol, + Username: o.Username, + Password: o.Password, + CredentialsProvider: o.CredentialsProvider, + CredentialsProviderContext: o.CredentialsProviderContext, + StreamingCredentialsProvider: o.StreamingCredentialsProvider, + SentinelUsername: o.SentinelUsername, SentinelPassword: o.SentinelPassword, @@ -199,10 +226,13 @@ func (o *UniversalOptions) Simple() *Options { Dialer: o.Dialer, OnConnect: o.OnConnect, - DB: o.DB, - Protocol: o.Protocol, - Username: o.Username, - Password: o.Password, + DB: o.DB, + Protocol: o.Protocol, + Username: o.Username, + Password: o.Password, + CredentialsProvider: o.CredentialsProvider, + CredentialsProviderContext: o.CredentialsProviderContext, + StreamingCredentialsProvider: o.StreamingCredentialsProvider, MaxRetries: o.MaxRetries, MinRetryBackoff: o.MinRetryBackoff, diff --git a/version.go b/version.go index cbed8bd8..e6dbfd14 100644 --- a/version.go +++ b/version.go @@ -2,5 +2,5 @@ package redis // Version is the current release version. func Version() string { - return "9.10.0" + return "9.11.0" }