1
0
mirror of https://github.com/redis/go-redis.git synced 2025-05-31 21:01:13 +03:00

Merge branch 'master' into os-add-csc-redis-commands

This commit is contained in:
ofekshenawa 2025-04-24 00:36:18 +03:00 committed by GitHub
commit 477e671711
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
56 changed files with 2208 additions and 712 deletions

View File

@ -25,7 +25,7 @@ runs:
# Mapping of redis version to redis testing containers
declare -A redis_version_mapping=(
["8.0-M03"]="8.0-M04-pre"
["8.0-RC1"]="8.0-RC1-pre"
["7.4.2"]="rs-7.4.0-v2"
["7.2.7"]="rs-7.2.0-v14"
)

View File

@ -29,6 +29,7 @@ Lua
MSSQL
namespace
NoSQL
OpenTelemetry
ORM
Packagist
PhpRedis

View File

@ -18,7 +18,7 @@ jobs:
fail-fast: false
matrix:
redis-version:
- "8.0-M03" # 8.0 milestone 4
- "8.0-RC1" # 8.0 RC1
- "7.4.2" # should use redis stack 7.4
go-version:
- "1.23.x"
@ -43,7 +43,7 @@ jobs:
# Mapping of redis version to redis testing containers
declare -A redis_version_mapping=(
["8.0-M03"]="8.0-M04-pre"
["8.0-RC1"]="8.0-RC1-pre"
["7.4.2"]="rs-7.4.0-v2"
)
if [[ -v redis_version_mapping[$REDIS_VERSION] ]]; then
@ -72,7 +72,7 @@ jobs:
fail-fast: false
matrix:
redis-version:
- "8.0-M03" # 8.0 milestone 4
- "8.0-RC1" # 8.0 RC1
- "7.4.2" # should use redis stack 7.4
- "7.2.7" # should redis stack 7.2
go-version:

View File

@ -21,4 +21,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: golangci-lint
uses: golangci/golangci-lint-action@v6.5.0
uses: golangci/golangci-lint-action@v6.5.2
with:
verify: false # disable verifying the configuration since golangci is currently introducing breaking changes in the configuration

View File

@ -8,7 +8,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Check Spelling
uses: rojopolis/spellcheck-github-actions@0.47.0
uses: rojopolis/spellcheck-github-actions@0.48.0
with:
config_path: .github/spellcheck-settings.yml
task_name: Markdown

View File

@ -54,5 +54,4 @@ jobs:
--ginkgo.skip-file="sentinel_test.go" \
--ginkgo.skip-file="osscluster_test.go" \
--ginkgo.skip-file="pubsub_test.go" \
--ginkgo.skip-file="gears_commands_test.go" \
--ginkgo.label-filter='!NonRedisEnterprise'

View File

@ -112,3 +112,7 @@ The core team regularly looks at pull requests. We will provide
feedback as soon as possible. After receiving our feedback, please respond
within two weeks. After that time, we may close your PR if it isn't
showing any activity.
## Support
Maintainers can provide limited support to contributors on discord: https://discord.gg/W4txy5AeKM

View File

@ -17,7 +17,7 @@ test.ci:
(cd "$${dir}" && \
go mod tidy -compat=1.18 && \
go vet && \
go test -coverprofile=coverage.txt -covermode=atomic ./... -race); \
go test -v -coverprofile=coverage.txt -covermode=atomic ./... -race); \
done
cd internal/customvet && go build .
go vet -vettool ./internal/customvet/customvet

View File

@ -3,8 +3,14 @@
[![build workflow](https://github.com/redis/go-redis/actions/workflows/build.yml/badge.svg)](https://github.com/redis/go-redis/actions)
[![PkgGoDev](https://pkg.go.dev/badge/github.com/redis/go-redis/v9)](https://pkg.go.dev/github.com/redis/go-redis/v9?tab=doc)
[![Documentation](https://img.shields.io/badge/redis-documentation-informational)](https://redis.uptrace.dev/)
[![Go Report Card](https://goreportcard.com/badge/github.com/redis/go-redis/v9)](https://goreportcard.com/report/github.com/redis/go-redis/v9)
[![codecov](https://codecov.io/github/redis/go-redis/graph/badge.svg?token=tsrCZKuSSw)](https://codecov.io/github/redis/go-redis)
[![Chat](https://discordapp.com/api/guilds/752070105847955518/widget.png)](https://discord.gg/rWtp5Aj)
[![Discord](https://img.shields.io/discord/697882427875393627.svg?style=social&logo=discord)](https://discord.gg/W4txy5AeKM)
[![Twitch](https://img.shields.io/twitch/status/redisinc?style=social)](https://www.twitch.tv/redisinc)
[![YouTube](https://img.shields.io/youtube/channel/views/UCD78lHSwYqMlyetR0_P4Vig?style=social)](https://www.youtube.com/redisinc)
[![Twitter](https://img.shields.io/twitter/follow/redisinc?style=social)](https://twitter.com/redisinc)
[![Stack Exchange questions](https://img.shields.io/stackexchange/stackoverflow/t/go-redis?style=social&logo=stackoverflow&label=Stackoverflow)](https://stackoverflow.com/questions/tagged/go-redis)
> go-redis is the official Redis client library for the Go programming language. It offers a straightforward interface for interacting with Redis servers.
@ -44,7 +50,7 @@ in the `go.mod` to `go 1.24` in one of the next releases.
## Resources
- [Discussions](https://github.com/redis/go-redis/discussions)
- [Chat](https://discord.gg/rWtp5Aj)
- [Chat](https://discord.gg/W4txy5AeKM)
- [Reference](https://pkg.go.dev/github.com/redis/go-redis/v9)
- [Examples](https://pkg.go.dev/github.com/redis/go-redis/v9#pkg-examples)
@ -167,6 +173,24 @@ func ExampleClient() *redis.Client {
```
### Instrument with OpenTelemetry
```go
import (
"github.com/redis/go-redis/v9"
"github.com/redis/go-redis/extra/redisotel/v9"
"errors"
)
func main() {
...
rdb := redis.NewClient(&redis.Options{...})
if err := errors.Join(redisotel.InstrumentTracing(rdb), redisotel.InstrumentMetrics(rdb)); err != nil {
log.Fatal(err)
}
```
### Advanced Configuration
@ -178,16 +202,18 @@ By default, go-redis automatically sends the client library name and version dur
#### Disabling Identity Verification
When connection identity verification is not required or needs to be explicitly disabled, a `DisableIndentity` configuration option exists. In V10 of this library, `DisableIndentity` will become `DisableIdentity` in order to fix the associated typo.
When connection identity verification is not required or needs to be explicitly disabled, a `DisableIdentity` configuration option exists.
Initially there was a typo and the option was named `DisableIndentity` instead of `DisableIdentity`. The misspelled option is marked as Deprecated and will be removed in V10 of this library.
Although both options will work at the moment, the correct option is `DisableIdentity`. The deprecated option will be removed in V10 of this library, so please use the correct option name to avoid any issues.
To disable verification, set the `DisableIndentity` option to `true` in the Redis client options:
To disable verification, set the `DisableIdentity` option to `true` in the Redis client options:
```go
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
DisableIndentity: true, // Disable set-info on connect
DisableIdentity: true, // Disable set-info on connect
})
```
@ -209,9 +235,30 @@ res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptio
val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawVal()
```
## Contributing
#### Redis-Search Default Dialect
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!
In the Redis-Search module, **the default dialect is 2**. If needed, you can explicitly specify a different dialect using the appropriate configuration in your queries.
**Important**: Be aware that the query dialect may impact the results returned. If needed, you can revert to a different dialect version by passing the desired dialect in the arguments of the command you want to execute.
For example:
```
res2, err := rdb.FTSearchWithArgs(ctx,
"idx:bicycle",
"@pickup_zone:[CONTAINS $bike]",
&redis.FTSearchOptions{
Params: map[string]interface{}{
"bike": "POINT(-0.1278 51.5074)",
},
DialectVersion: 3,
},
).Result()
```
You can find further details in the [query dialect documentation](https://redis.io/docs/latest/develop/interact/search-and-query/advanced-concepts/dialects/).
## Contributing
We welcome contributions to the go-redis library! If you have a bug fix, feature request, or improvement, please open an issue or pull request on GitHub.
We appreciate your help in making go-redis better for everyone.
If you are interested in contributing to the go-redis library, please check out our [contributing guidelines](CONTRIBUTING.md) for more information on how to get started.
## Look and feel

View File

@ -30,7 +30,7 @@ func NewClientStub(resp []byte) *ClientStub {
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
return stub.stubConn(initHello), nil
},
DisableIndentity: true,
DisableIdentity: true,
})
return stub
}
@ -46,7 +46,7 @@ func NewClusterClientStub(resp []byte) *ClientStub {
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
return stub.stubConn(initHello), nil
},
DisableIndentity: true,
DisableIdentity: true,
ClusterSlots: func(_ context.Context) ([]ClusterSlot, error) {
return []ClusterSlot{

View File

@ -3831,7 +3831,8 @@ func (cmd *MapStringStringSliceCmd) readReply(rd *proto.Reader) error {
}
// -----------------------------------------------------------------------
// MapStringInterfaceCmd represents a command that returns a map of strings to interface{}.
// MapMapStringInterfaceCmd represents a command that returns a map of strings to interface{}.
type MapMapStringInterfaceCmd struct {
baseCmd
val map[string]interface{}

View File

@ -81,6 +81,8 @@ func appendArg(dst []interface{}, arg interface{}) []interface{} {
return dst
case time.Time, time.Duration, encoding.BinaryMarshaler, net.IP:
return append(dst, arg)
case nil:
return dst
default:
// scan struct field
v := reflect.ValueOf(arg)
@ -211,7 +213,6 @@ type Cmdable interface {
ACLCmdable
BitMapCmdable
ClusterCmdable
GearsCmdable
GenericCmdable
GeoCmdable
HashCmdable
@ -422,7 +423,7 @@ func (info LibraryInfo) Validate() error {
return nil
}
// Hello Set the resp protocol used.
// Hello sets the resp protocol used.
func (c statefulCmdable) Hello(ctx context.Context,
ver int, username, password, clientName string,
) *MapStringInterfaceCmd {
@ -514,6 +515,12 @@ func (c cmdable) Ping(ctx context.Context) *StatusCmd {
return cmd
}
func (c cmdable) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) Quit(_ context.Context) *StatusCmd {
panic("not implemented")
}

View File

@ -84,6 +84,12 @@ var _ = Describe("Commands", func() {
Expect(ping.Val()).To(Equal("PONG"))
})
It("should Ping with Do method", func() {
result := client.Conn().Do(ctx, "PING")
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal("PONG"))
})
It("should Wait", func() {
const wait = 3 * time.Second
@ -2665,7 +2671,6 @@ var _ = Describe("Commands", func() {
Expect(res).To(Equal([]int64{1, 1, -2}))
})
It("should HPExpire", Label("hash-expiration", "NonRedisEnterprise"), func() {
SkipBeforeRedisVersion(7.4, "doesn't work with older redis stack images")
res, err := client.HPExpire(ctx, "no_such_key", 10*time.Second, "field1", "field2", "field3").Result()
@ -2818,6 +2823,148 @@ var _ = Describe("Commands", func() {
Expect(err).NotTo(HaveOccurred())
Expect(res[0]).To(BeNumerically("~", 10*time.Second.Milliseconds(), 1))
})
It("should HGETDEL", Label("hash", "HGETDEL"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
err := client.HSet(ctx, "myhash", "f1", "val1", "f2", "val2", "f3", "val3").Err()
Expect(err).NotTo(HaveOccurred())
// Execute HGETDEL on fields f1 and f2.
res, err := client.HGetDel(ctx, "myhash", "f1", "f2").Result()
Expect(err).NotTo(HaveOccurred())
// Expect the returned values for f1 and f2.
Expect(res).To(Equal([]string{"val1", "val2"}))
// Verify that f1 and f2 have been deleted, while f3 remains.
remaining, err := client.HMGet(ctx, "myhash", "f1", "f2", "f3").Result()
Expect(err).NotTo(HaveOccurred())
Expect(remaining[0]).To(BeNil())
Expect(remaining[1]).To(BeNil())
Expect(remaining[2]).To(Equal("val3"))
})
It("should return nil responses for HGETDEL on non-existent key", Label("hash", "HGETDEL"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
// HGETDEL on a key that does not exist.
res, err := client.HGetDel(ctx, "nonexistent", "f1", "f2").Result()
Expect(err).To(BeNil())
Expect(res).To(Equal([]string{"", ""}))
})
// -----------------------------
// HGETEX with various TTL options
// -----------------------------
It("should HGETEX with EX option", Label("hash", "HGETEX"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
err := client.HSet(ctx, "myhash", "f1", "val1", "f2", "val2").Err()
Expect(err).NotTo(HaveOccurred())
// Call HGETEX with EX option and 60 seconds TTL.
opt := redis.HGetEXOptions{
ExpirationType: redis.HGetEXExpirationEX,
ExpirationVal: 60,
}
res, err := client.HGetEXWithArgs(ctx, "myhash", &opt, "f1", "f2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]string{"val1", "val2"}))
})
It("should HGETEX with PERSIST option", Label("hash", "HGETEX"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
err := client.HSet(ctx, "myhash", "f1", "val1", "f2", "val2").Err()
Expect(err).NotTo(HaveOccurred())
// Call HGETEX with PERSIST (no TTL value needed).
opt := redis.HGetEXOptions{ExpirationType: redis.HGetEXExpirationPERSIST}
res, err := client.HGetEXWithArgs(ctx, "myhash", &opt, "f1", "f2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]string{"val1", "val2"}))
})
It("should HGETEX with EXAT option", Label("hash", "HGETEX"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
err := client.HSet(ctx, "myhash", "f1", "val1", "f2", "val2").Err()
Expect(err).NotTo(HaveOccurred())
// Set expiration at a specific Unix timestamp (60 seconds from now).
expireAt := time.Now().Add(60 * time.Second).Unix()
opt := redis.HGetEXOptions{
ExpirationType: redis.HGetEXExpirationEXAT,
ExpirationVal: expireAt,
}
res, err := client.HGetEXWithArgs(ctx, "myhash", &opt, "f1", "f2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]string{"val1", "val2"}))
})
// -----------------------------
// HSETEX with FNX/FXX options
// -----------------------------
It("should HSETEX with FNX condition", Label("hash", "HSETEX"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
opt := redis.HSetEXOptions{
Condition: redis.HSetEXFNX,
ExpirationType: redis.HSetEXExpirationEX,
ExpirationVal: 60,
}
res, err := client.HSetEXWithArgs(ctx, "myhash", &opt, "f1", "val1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal(int64(1)))
opt = redis.HSetEXOptions{
Condition: redis.HSetEXFNX,
ExpirationType: redis.HSetEXExpirationEX,
ExpirationVal: 60,
}
res, err = client.HSetEXWithArgs(ctx, "myhash", &opt, "f1", "val2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal(int64(0)))
})
It("should HSETEX with FXX condition", Label("hash", "HSETEX"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
err := client.HSet(ctx, "myhash", "f2", "val1").Err()
Expect(err).NotTo(HaveOccurred())
opt := redis.HSetEXOptions{
Condition: redis.HSetEXFXX,
ExpirationType: redis.HSetEXExpirationEX,
ExpirationVal: 60,
}
res, err := client.HSetEXWithArgs(ctx, "myhash", &opt, "f2", "val2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal(int64(1)))
opt = redis.HSetEXOptions{
Condition: redis.HSetEXFXX,
ExpirationType: redis.HSetEXExpirationEX,
ExpirationVal: 60,
}
res, err = client.HSetEXWithArgs(ctx, "myhash", &opt, "f3", "val3").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal(int64(0)))
})
It("should HSETEX with multiple field operations", Label("hash", "HSETEX"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
opt := redis.HSetEXOptions{
ExpirationType: redis.HSetEXExpirationEX,
ExpirationVal: 60,
}
res, err := client.HSetEXWithArgs(ctx, "myhash", &opt, "f1", "val1", "f2", "val2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal(int64(1)))
values, err := client.HMGet(ctx, "myhash", "f1", "f2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(values).To(Equal([]interface{}{"val1", "val2"}))
})
})
Describe("hyperloglog", func() {
@ -7068,6 +7215,17 @@ var _ = Describe("Commands", func() {
Expect(err).NotTo(HaveOccurred())
Expect(vals).To(Equal([]interface{}{int64(12), proto.RedisError("error"), "abc"}))
})
It("returns empty values when args are nil", func() {
vals, err := client.Eval(
ctx,
"return {ARGV[1]}",
[]string{},
nil,
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(vals).To(BeEmpty())
})
})
Describe("EvalRO", func() {
@ -7091,6 +7249,17 @@ var _ = Describe("Commands", func() {
Expect(err).NotTo(HaveOccurred())
Expect(vals).To(Equal([]interface{}{int64(12), proto.RedisError("error"), "abc"}))
})
It("returns empty values when args are nil", func() {
vals, err := client.EvalRO(
ctx,
"return {ARGV[1]}",
[]string{},
nil,
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(vals).To(BeEmpty())
})
})
Describe("Functions", func() {

View File

@ -3,6 +3,7 @@
services:
redis:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:rs-7.4.0-v2}
platform: linux/amd64
container_name: redis-standalone
environment:
- TLS_ENABLED=yes
@ -23,6 +24,7 @@ services:
osscluster:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:rs-7.4.0-v2}
platform: linux/amd64
container_name: redis-osscluster
environment:
- NODES=6
@ -39,6 +41,7 @@ services:
sentinel-cluster:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:rs-7.4.0-v2}
platform: linux/amd64
container_name: redis-sentinel-cluster
network_mode: "host"
environment:
@ -58,6 +61,7 @@ services:
sentinel:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:rs-7.4.0-v2}
platform: linux/amd64
container_name: redis-sentinel
depends_on:
- sentinel-cluster
@ -81,6 +85,7 @@ services:
ring-cluster:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:rs-7.4.0-v2}
platform: linux/amd64
container_name: redis-ring-cluster
environment:
- NODES=3

323
doctests/cmds_list_test.go Normal file
View File

@ -0,0 +1,323 @@
// EXAMPLE: cmds_list
// HIDE_START
package example_commands_test
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
// HIDE_END
func ExampleClient_cmd_llen() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
})
// REMOVE_START
rdb.Del(ctx, "mylist")
// REMOVE_END
// STEP_START llen
lPushResult1, err := rdb.LPush(ctx, "mylist", "World").Result()
if err != nil {
panic(err)
}
fmt.Println(lPushResult1) // >>> 1
lPushResult2, err := rdb.LPush(ctx, "mylist", "Hello").Result()
if err != nil {
panic(err)
}
fmt.Println(lPushResult2) // >>> 2
lLenResult, err := rdb.LLen(ctx, "mylist").Result()
if err != nil {
panic(err)
}
fmt.Println(lLenResult) // >>> 2
// STEP_END
// Output:
// 1
// 2
// 2
}
func ExampleClient_cmd_lpop() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
})
// REMOVE_START
rdb.Del(ctx, "mylist")
// REMOVE_END
// STEP_START lpop
RPushResult, err := rdb.RPush(ctx,
"mylist", "one", "two", "three", "four", "five",
).Result()
if err != nil {
panic(err)
}
fmt.Println(RPushResult) // >>> 5
lPopResult, err := rdb.LPop(ctx, "mylist").Result()
if err != nil {
panic(err)
}
fmt.Println(lPopResult) // >>> one
lPopCountResult, err := rdb.LPopCount(ctx, "mylist", 2).Result()
if err != nil {
panic(err)
}
fmt.Println(lPopCountResult) // >>> [two three]
lRangeResult, err := rdb.LRange(ctx, "mylist", 0, -1).Result()
if err != nil {
panic(err)
}
fmt.Println(lRangeResult) // >>> [four five]
// STEP_END
// Output:
// 5
// one
// [two three]
// [four five]
}
func ExampleClient_cmd_lpush() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
})
// REMOVE_START
rdb.Del(ctx, "mylist")
// REMOVE_END
// STEP_START lpush
lPushResult1, err := rdb.LPush(ctx, "mylist", "World").Result()
if err != nil {
panic(err)
}
fmt.Println(lPushResult1) // >>> 1
lPushResult2, err := rdb.LPush(ctx, "mylist", "Hello").Result()
if err != nil {
panic(err)
}
fmt.Println(lPushResult2) // >>> 2
lRangeResult, err := rdb.LRange(ctx, "mylist", 0, -1).Result()
if err != nil {
panic(err)
}
fmt.Println(lRangeResult) // >>> [Hello World]
// STEP_END
// Output:
// 1
// 2
// [Hello World]
}
func ExampleClient_cmd_lrange() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
})
// REMOVE_START
rdb.Del(ctx, "mylist")
// REMOVE_END
// STEP_START lrange
RPushResult, err := rdb.RPush(ctx, "mylist",
"one", "two", "three",
).Result()
if err != nil {
panic(err)
}
fmt.Println(RPushResult) // >>> 3
lRangeResult1, err := rdb.LRange(ctx, "mylist", 0, 0).Result()
if err != nil {
panic(err)
}
fmt.Println(lRangeResult1) // >>> [one]
lRangeResult2, err := rdb.LRange(ctx, "mylist", -3, 2).Result()
if err != nil {
panic(err)
}
fmt.Println(lRangeResult2) // >>> [one two three]
lRangeResult3, err := rdb.LRange(ctx, "mylist", -100, 100).Result()
if err != nil {
panic(err)
}
fmt.Println(lRangeResult3) // >>> [one two three]
lRangeResult4, err := rdb.LRange(ctx, "mylist", 5, 10).Result()
if err != nil {
panic(err)
}
fmt.Println(lRangeResult4) // >>> []
// STEP_END
// Output:
// 3
// [one]
// [one two three]
// [one two three]
// []
}
func ExampleClient_cmd_rpop() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
})
// REMOVE_START
rdb.Del(ctx, "mylist")
// REMOVE_END
// STEP_START rpop
rPushResult, err := rdb.RPush(ctx, "mylist",
"one", "two", "three", "four", "five",
).Result()
if err != nil {
panic(err)
}
fmt.Println(rPushResult) // >>> 5
rPopResult, err := rdb.RPop(ctx, "mylist").Result()
if err != nil {
panic(err)
}
fmt.Println(rPopResult) // >>> five
rPopCountResult, err := rdb.RPopCount(ctx, "mylist", 2).Result()
if err != nil {
panic(err)
}
fmt.Println(rPopCountResult) // >>> [four three]
lRangeResult, err := rdb.LRange(ctx, "mylist", 0, -1).Result()
if err != nil {
panic(err)
}
fmt.Println(lRangeResult) // >>> [one two]
// STEP_END
// Output:
// 5
// five
// [four three]
// [one two]
}
func ExampleClient_cmd_rpush() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
})
// REMOVE_START
rdb.Del(ctx, "mylist")
// REMOVE_END
// STEP_START rpush
rPushResult1, err := rdb.RPush(ctx, "mylist", "Hello").Result()
if err != nil {
panic(err)
}
fmt.Println(rPushResult1) // >>> 1
rPushResult2, err := rdb.RPush(ctx, "mylist", "World").Result()
if err != nil {
panic(err)
}
fmt.Println(rPushResult2) // >>> 2
lRangeResult, err := rdb.LRange(ctx, "mylist", 0, -1).Result()
if err != nil {
panic(err)
}
fmt.Println(lRangeResult) // >>> [Hello World]
// STEP_END
// Output:
// 1
// 2
// [Hello World]
}

102
doctests/cmds_set_test.go Normal file
View File

@ -0,0 +1,102 @@
// EXAMPLE: cmds_set
// HIDE_START
package example_commands_test
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
// HIDE_END
func ExampleClient_sadd_cmd() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
})
// REMOVE_START
rdb.Del(ctx, "myset")
// REMOVE_END
// STEP_START sadd
sAddResult1, err := rdb.SAdd(ctx, "myset", "Hello").Result()
if err != nil {
panic(err)
}
fmt.Println(sAddResult1) // >>> 1
sAddResult2, err := rdb.SAdd(ctx, "myset", "World").Result()
if err != nil {
panic(err)
}
fmt.Println(sAddResult2) // >>> 1
sAddResult3, err := rdb.SAdd(ctx, "myset", "World").Result()
if err != nil {
panic(err)
}
fmt.Println(sAddResult3) // >>> 0
sMembersResult, err := rdb.SMembers(ctx, "myset").Result()
if err != nil {
panic(err)
}
fmt.Println(sMembersResult) // >>> [Hello World]
// STEP_END
// Output:
// 1
// 1
// 0
// [Hello World]
}
func ExampleClient_smembers_cmd() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
})
// REMOVE_START
rdb.Del(ctx, "myset")
// REMOVE_END
// STEP_START smembers
sAddResult, err := rdb.SAdd(ctx, "myset", "Hello", "World").Result()
if err != nil {
panic(err)
}
fmt.Println(sAddResult) // >>> 2
sMembersResult, err := rdb.SMembers(ctx, "myset").Result()
if err != nil {
panic(err)
}
fmt.Println(sMembersResult) // >>> [Hello World]
// STEP_END
// Output:
// 2
// [Hello World]
}

View File

@ -152,6 +152,32 @@ func ExampleClient_search_json() {
// >>> Tel Aviv
// STEP_END
// STEP_START query2count_only
citiesResult2, err := rdb.FTSearchWithArgs(
ctx,
"idx:users",
"Paul",
&redis.FTSearchOptions{
Return: []redis.FTSearchReturn{
{
FieldName: "$.city",
As: "city",
},
},
CountOnly: true,
},
).Result()
if err != nil {
panic(err)
}
// The `Total` field has the correct number of docs found
// by the query but the `Docs` slice is empty.
fmt.Println(len(citiesResult2.Docs)) // >>> 0
fmt.Println(citiesResult2.Total) // >>> 2
// STEP_END
// STEP_START query3
aggOptions := redis.FTAggregateOptions{
GroupBy: []redis.FTAggregateGroupBy{
@ -196,6 +222,112 @@ func ExampleClient_search_json() {
// {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv","email":"paul.zamir@example.com","name":"Paul Zamir"}]}]}
// London
// Tel Aviv
// 0
// 2
// London - 1
// Tel Aviv - 2
}
func ExampleClient_search_hash() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
Protocol: 2,
})
// REMOVE_START
rdb.Del(ctx, "huser:1", "huser:2", "huser:3")
rdb.FTDropIndex(ctx, "hash-idx:users")
// REMOVE_END
// STEP_START make_hash_index
_, err := rdb.FTCreate(
ctx,
"hash-idx:users",
// Options:
&redis.FTCreateOptions{
OnHash: true,
Prefix: []interface{}{"huser:"},
},
// Index schema fields:
&redis.FieldSchema{
FieldName: "name",
FieldType: redis.SearchFieldTypeText,
},
&redis.FieldSchema{
FieldName: "city",
FieldType: redis.SearchFieldTypeTag,
},
&redis.FieldSchema{
FieldName: "age",
FieldType: redis.SearchFieldTypeNumeric,
},
).Result()
if err != nil {
panic(err)
}
// STEP_END
user1 := map[string]interface{}{
"name": "Paul John",
"email": "paul.john@example.com",
"age": 42,
"city": "London",
}
user2 := map[string]interface{}{
"name": "Eden Zamir",
"email": "eden.zamir@example.com",
"age": 29,
"city": "Tel Aviv",
}
user3 := map[string]interface{}{
"name": "Paul Zamir",
"email": "paul.zamir@example.com",
"age": 35,
"city": "Tel Aviv",
}
// STEP_START add_hash_data
_, err = rdb.HSet(ctx, "huser:1", user1).Result()
if err != nil {
panic(err)
}
_, err = rdb.HSet(ctx, "huser:2", user2).Result()
if err != nil {
panic(err)
}
_, err = rdb.HSet(ctx, "huser:3", user3).Result()
if err != nil {
panic(err)
}
// STEP_END
// STEP_START query1_hash
findPaulHashResult, err := rdb.FTSearch(
ctx,
"hash-idx:users",
"Paul @age:[30 40]",
).Result()
if err != nil {
panic(err)
}
fmt.Println(findPaulHashResult)
// >>> {1 [{huser:3 <nil> <nil> <nil> map[age:35 city:Tel Aviv...
// STEP_END
// Output:
// {1 [{huser:3 <nil> <nil> <nil> map[age:35 city:Tel Aviv email:paul.zamir@example.com name:Paul Zamir]}]}
}

View File

@ -53,6 +53,9 @@ func shouldRetry(err error, retryTimeout bool) bool {
return true
case nil, context.Canceled, context.DeadlineExceeded:
return false
case pool.ErrPoolTimeout:
// connection pool timeout, increase retries. #3289
return true
}
if v, ok := err.(timeoutError); ok {
@ -72,6 +75,9 @@ func shouldRetry(err error, retryTimeout bool) bool {
if strings.HasPrefix(s, "READONLY ") {
return true
}
if strings.HasPrefix(s, "MASTERDOWN ") {
return true
}
if strings.HasPrefix(s, "CLUSTERDOWN ") {
return true
}

65
error_test.go Normal file
View File

@ -0,0 +1,65 @@
package redis_test
import (
"context"
"errors"
"io"
. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
"github.com/redis/go-redis/v9"
)
type testTimeout struct {
timeout bool
}
func (t testTimeout) Timeout() bool {
return t.timeout
}
func (t testTimeout) Error() string {
return "test timeout"
}
var _ = Describe("error", func() {
BeforeEach(func() {
})
AfterEach(func() {
})
It("should retry", func() {
data := map[error]bool{
io.EOF: true,
io.ErrUnexpectedEOF: true,
nil: false,
context.Canceled: false,
context.DeadlineExceeded: false,
redis.ErrPoolTimeout: true,
errors.New("ERR max number of clients reached"): true,
errors.New("LOADING Redis is loading the dataset in memory"): true,
errors.New("READONLY You can't write against a read only replica"): true,
errors.New("CLUSTERDOWN The cluster is down"): true,
errors.New("TRYAGAIN Command cannot be processed, please try again"): true,
errors.New("other"): false,
}
for err, expected := range data {
Expect(redis.ShouldRetry(err, false)).To(Equal(expected))
Expect(redis.ShouldRetry(err, true)).To(Equal(expected))
}
})
It("should retry timeout", func() {
t1 := testTimeout{timeout: true}
Expect(redis.ShouldRetry(t1, true)).To(Equal(true))
Expect(redis.ShouldRetry(t1, false)).To(Equal(false))
t2 := testTimeout{timeout: false}
Expect(redis.ShouldRetry(t2, true)).To(Equal(true))
Expect(redis.ShouldRetry(t2, false)).To(Equal(true))
})
})

View File

@ -5,7 +5,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require (
github.com/redis/go-redis/v9 v9.7.1
github.com/redis/go-redis/v9 v9.8.0-beta.1
go.uber.org/zap v1.24.0
)

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.7.1
require github.com/redis/go-redis/v9 v9.8.0-beta.1
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect

View File

@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/davecgh/go-spew v1.1.1
github.com/redis/go-redis/v9 v9.6.2
github.com/redis/go-redis/v9 v9.8.0-beta.1
)
require (

View File

@ -1,7 +1,5 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.7.1
require github.com/redis/go-redis/v9 v9.8.0-beta.1
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect

View File

@ -1,6 +1,8 @@
module github.com/redis/go-redis/example/otel
go 1.19
go 1.23.0
toolchain go1.24.1
replace github.com/redis/go-redis/v9 => ../..
@ -9,8 +11,8 @@ replace github.com/redis/go-redis/extra/redisotel/v9 => ../../extra/redisotel
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../../extra/rediscmd
require (
github.com/redis/go-redis/extra/redisotel/v9 v9.7.1
github.com/redis/go-redis/v9 v9.7.1
github.com/redis/go-redis/extra/redisotel/v9 v9.8.0-beta.1
github.com/redis/go-redis/v9 v9.8.0-beta.1
github.com/uptrace/uptrace-go v1.21.0
go.opentelemetry.io/otel v1.22.0
)
@ -23,7 +25,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.7.1 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.8.0-beta.1 // indirect
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
@ -34,9 +36,9 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.22.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/net v0.36.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.22.0 // indirect
google.golang.org/genproto v0.0.0-20240108191215-35c7eff3a6b1 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240108191215-35c7eff3a6b1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect

View File

@ -1,10 +1,13 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
@ -17,10 +20,13 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/uptrace/uptrace-go v1.21.0 h1:oJoUjhiVT7aiuoG6B3ClVHtJozLn3cK9hQt8U5dQO1M=
github.com/uptrace/uptrace-go v1.21.0/go.mod h1:/aXAFGKOqeAFBqWa1xtzLnGX2xJm1GScqz9NJ0TJjLM=
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 h1:m9ReioVPIffxjJlGNRd0d5poy+9oTro3D+YbiEzUDOc=
@ -46,12 +52,13 @@ go.opentelemetry.io/otel/trace v1.22.0/go.mod h1:RbbHXVqKES9QhzZq/fE5UnOSILqRt40
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA=
golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20240108191215-35c7eff3a6b1 h1:/IWabOtPziuXTEtI1KYCpM6Ss7vaAkeMxk+uXV/xvZs=
google.golang.org/genproto v0.0.0-20240108191215-35c7eff3a6b1/go.mod h1:+Rvu7ElI+aLzyDQhpHMFMMltsD6m7nqpuWDd2CwJw3k=
@ -66,3 +73,4 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.7.1
require github.com/redis/go-redis/v9 v9.8.0-beta.1
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect

View File

@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/davecgh/go-spew v1.1.1
github.com/redis/go-redis/v9 v9.7.1
github.com/redis/go-redis/v9 v9.8.0-beta.1
)
require (

View File

@ -11,6 +11,8 @@ import (
"github.com/redis/go-redis/v9/internal/pool"
)
var ErrPoolTimeout = pool.ErrPoolTimeout
func (c *baseClient) Pool() pool.Pooler {
return c.connPool
}
@ -102,3 +104,7 @@ func (c *Ring) ShardByName(name string) *ringShard {
func (c *ModuleLoadexConfig) ToArgs() []interface{} {
return c.toArgs()
}
func ShouldRetry(err error, retryTimeout bool) bool {
return shouldRetry(err, retryTimeout)
}

View File

@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../..
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd
require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.7.1
github.com/redis/go-redis/v9 v9.7.1
github.com/redis/go-redis/extra/rediscmd/v9 v9.8.0-beta.1
github.com/redis/go-redis/v9 v9.8.0-beta.1
go.opencensus.io v0.24.0
)
@ -18,4 +18,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
)
retract v9.5.3 // This version was accidentally released.
retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
)

View File

@ -7,7 +7,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/bsm/ginkgo/v2 v2.12.0
github.com/bsm/gomega v1.27.10
github.com/redis/go-redis/v9 v9.7.1
github.com/redis/go-redis/v9 v9.8.0-beta.1
)
require (
@ -15,4 +15,7 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)
retract v9.5.3 // This version was accidentally released.
retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
)

View File

@ -17,7 +17,6 @@ func CmdString(cmd redis.Cmder) string {
}
func CmdsString(cmds []redis.Cmder) (string, string) {
const numCmdLimit = 100
const numNameLimit = 10
seen := make(map[string]struct{}, numNameLimit)
@ -26,10 +25,6 @@ func CmdsString(cmds []redis.Cmder) (string, string) {
b := make([]byte, 0, 32*len(cmds))
for i, cmd := range cmds {
if i > numCmdLimit {
break
}
if i > 0 {
b = append(b, '\n')
}
@ -51,12 +46,7 @@ func CmdsString(cmds []redis.Cmder) (string, string) {
}
func AppendCmd(b []byte, cmd redis.Cmder) []byte {
const numArgLimit = 32
for i, arg := range cmd.Args() {
if i > numArgLimit {
break
}
if i > 0 {
b = append(b, ' ')
}
@ -72,20 +62,12 @@ func AppendCmd(b []byte, cmd redis.Cmder) []byte {
}
func appendArg(b []byte, v interface{}) []byte {
const argLenLimit = 64
switch v := v.(type) {
case nil:
return append(b, "<nil>"...)
case string:
if len(v) > argLenLimit {
v = v[:argLenLimit]
}
return appendUTF8String(b, Bytes(v))
case []byte:
if len(v) > argLenLimit {
v = v[:argLenLimit]
}
return appendUTF8String(b, v)
case int:
return strconv.AppendInt(b, int64(v), 10)

View File

@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../..
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd
require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.7.1
github.com/redis/go-redis/v9 v9.7.1
github.com/redis/go-redis/extra/rediscmd/v9 v9.8.0-beta.1
github.com/redis/go-redis/v9 v9.8.0-beta.1
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/metric v1.22.0
go.opentelemetry.io/otel/sdk v1.22.0
@ -23,4 +23,7 @@ require (
golang.org/x/sys v0.16.0 // indirect
)
retract v9.5.3 // This version was accidentally released.
retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
)

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"strings"
"testing"
"go.opentelemetry.io/otel/attribute"
@ -222,6 +223,169 @@ func TestTracingHook_ProcessPipelineHook(t *testing.T) {
}
}
func TestTracingHook_ProcessHook_LongCommand(t *testing.T) {
imsb := tracetest.NewInMemoryExporter()
provider := sdktrace.NewTracerProvider(sdktrace.WithSyncer(imsb))
hook := newTracingHook(
"redis://localhost:6379",
WithTracerProvider(provider),
)
longValue := strings.Repeat("a", 102400)
tests := []struct {
name string
cmd redis.Cmder
expected string
}{
{
name: "short command",
cmd: redis.NewCmd(context.Background(), "SET", "key", "value"),
expected: "SET key value",
},
{
name: "set command with long key",
cmd: redis.NewCmd(context.Background(), "SET", longValue, "value"),
expected: "SET " + longValue + " value",
},
{
name: "set command with long value",
cmd: redis.NewCmd(context.Background(), "SET", "key", longValue),
expected: "SET key " + longValue,
},
{
name: "set command with long key and value",
cmd: redis.NewCmd(context.Background(), "SET", longValue, longValue),
expected: "SET " + longValue + " " + longValue,
},
{
name: "short command with many arguments",
cmd: redis.NewCmd(context.Background(), "MSET", "key1", "value1", "key2", "value2", "key3", "value3", "key4", "value4", "key5", "value5"),
expected: "MSET key1 value1 key2 value2 key3 value3 key4 value4 key5 value5",
},
{
name: "long command",
cmd: redis.NewCmd(context.Background(), longValue, "key", "value"),
expected: longValue + " key value",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer imsb.Reset()
processHook := hook.ProcessHook(func(ctx context.Context, cmd redis.Cmder) error {
return nil
})
if err := processHook(context.Background(), tt.cmd); err != nil {
t.Fatal(err)
}
assertEqual(t, 1, len(imsb.GetSpans()))
spanData := imsb.GetSpans()[0]
var dbStatement string
for _, attr := range spanData.Attributes {
if attr.Key == semconv.DBStatementKey {
dbStatement = attr.Value.AsString()
break
}
}
if dbStatement != tt.expected {
t.Errorf("Expected DB statement: %q\nGot: %q", tt.expected, dbStatement)
}
})
}
}
func TestTracingHook_ProcessPipelineHook_LongCommands(t *testing.T) {
imsb := tracetest.NewInMemoryExporter()
provider := sdktrace.NewTracerProvider(sdktrace.WithSyncer(imsb))
hook := newTracingHook(
"redis://localhost:6379",
WithTracerProvider(provider),
)
tests := []struct {
name string
cmds []redis.Cmder
expected string
}{
{
name: "multiple short commands",
cmds: []redis.Cmder{
redis.NewCmd(context.Background(), "SET", "key1", "value1"),
redis.NewCmd(context.Background(), "SET", "key2", "value2"),
},
expected: "SET key1 value1\nSET key2 value2",
},
{
name: "multiple short commands with long key",
cmds: []redis.Cmder{
redis.NewCmd(context.Background(), "SET", strings.Repeat("a", 102400), "value1"),
redis.NewCmd(context.Background(), "SET", strings.Repeat("b", 102400), "value2"),
},
expected: "SET " + strings.Repeat("a", 102400) + " value1\nSET " + strings.Repeat("b", 102400) + " value2",
},
{
name: "multiple short commands with long value",
cmds: []redis.Cmder{
redis.NewCmd(context.Background(), "SET", "key1", strings.Repeat("a", 102400)),
redis.NewCmd(context.Background(), "SET", "key2", strings.Repeat("b", 102400)),
},
expected: "SET key1 " + strings.Repeat("a", 102400) + "\nSET key2 " + strings.Repeat("b", 102400),
},
{
name: "multiple short commands with long key and value",
cmds: []redis.Cmder{
redis.NewCmd(context.Background(), "SET", strings.Repeat("a", 102400), strings.Repeat("b", 102400)),
redis.NewCmd(context.Background(), "SET", strings.Repeat("c", 102400), strings.Repeat("d", 102400)),
},
expected: "SET " + strings.Repeat("a", 102400) + " " + strings.Repeat("b", 102400) + "\nSET " + strings.Repeat("c", 102400) + " " + strings.Repeat("d", 102400),
},
{
name: "multiple long commands",
cmds: []redis.Cmder{
redis.NewCmd(context.Background(), strings.Repeat("a", 102400), "key1", "value1"),
redis.NewCmd(context.Background(), strings.Repeat("a", 102400), "key2", "value2"),
},
expected: strings.Repeat("a", 102400) + " key1 value1\n" + strings.Repeat("a", 102400) + " key2 value2",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer imsb.Reset()
processHook := hook.ProcessPipelineHook(func(ctx context.Context, cmds []redis.Cmder) error {
return nil
})
if err := processHook(context.Background(), tt.cmds); err != nil {
t.Fatal(err)
}
assertEqual(t, 1, len(imsb.GetSpans()))
spanData := imsb.GetSpans()[0]
var dbStatement string
for _, attr := range spanData.Attributes {
if attr.Key == semconv.DBStatementKey {
dbStatement = attr.Value.AsString()
break
}
}
if dbStatement != tt.expected {
t.Errorf("Expected DB statement:\n%q\nGot:\n%q", tt.expected, dbStatement)
}
})
}
}
func assertEqual(t *testing.T, expected, actual interface{}) {
t.Helper()
if expected != actual {

View File

@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
require (
github.com/prometheus/client_golang v1.14.0
github.com/redis/go-redis/v9 v9.7.1
github.com/redis/go-redis/v9 v9.8.0-beta.1
)
require (
@ -22,4 +22,7 @@ require (
google.golang.org/protobuf v1.33.0 // indirect
)
retract v9.5.3 // This version was accidentally released.
retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
)

View File

@ -1,149 +0,0 @@
package redis
import (
"context"
"fmt"
"strings"
)
type GearsCmdable interface {
TFunctionLoad(ctx context.Context, lib string) *StatusCmd
TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd
TFunctionDelete(ctx context.Context, libName string) *StatusCmd
TFunctionList(ctx context.Context) *MapStringInterfaceSliceCmd
TFunctionListArgs(ctx context.Context, options *TFunctionListOptions) *MapStringInterfaceSliceCmd
TFCall(ctx context.Context, libName string, funcName string, numKeys int) *Cmd
TFCallArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd
TFCallASYNC(ctx context.Context, libName string, funcName string, numKeys int) *Cmd
TFCallASYNCArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd
}
type TFunctionLoadOptions struct {
Replace bool
Config string
}
type TFunctionListOptions struct {
Withcode bool
Verbose int
Library string
}
type TFCallOptions struct {
Keys []string
Arguments []string
}
// TFunctionLoad - load a new JavaScript library into Redis.
// For more information - https://redis.io/commands/tfunction-load/
func (c cmdable) TFunctionLoad(ctx context.Context, lib string) *StatusCmd {
args := []interface{}{"TFUNCTION", "LOAD", lib}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd {
args := []interface{}{"TFUNCTION", "LOAD"}
if options != nil {
if options.Replace {
args = append(args, "REPLACE")
}
if options.Config != "" {
args = append(args, "CONFIG", options.Config)
}
}
args = append(args, lib)
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// TFunctionDelete - delete a JavaScript library from Redis.
// For more information - https://redis.io/commands/tfunction-delete/
func (c cmdable) TFunctionDelete(ctx context.Context, libName string) *StatusCmd {
args := []interface{}{"TFUNCTION", "DELETE", libName}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// TFunctionList - list the functions with additional information about each function.
// For more information - https://redis.io/commands/tfunction-list/
func (c cmdable) TFunctionList(ctx context.Context) *MapStringInterfaceSliceCmd {
args := []interface{}{"TFUNCTION", "LIST"}
cmd := NewMapStringInterfaceSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) TFunctionListArgs(ctx context.Context, options *TFunctionListOptions) *MapStringInterfaceSliceCmd {
args := []interface{}{"TFUNCTION", "LIST"}
if options != nil {
if options.Withcode {
args = append(args, "WITHCODE")
}
if options.Verbose != 0 {
v := strings.Repeat("v", options.Verbose)
args = append(args, v)
}
if options.Library != "" {
args = append(args, "LIBRARY", options.Library)
}
}
cmd := NewMapStringInterfaceSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// TFCall - invoke a function.
// For more information - https://redis.io/commands/tfcall/
func (c cmdable) TFCall(ctx context.Context, libName string, funcName string, numKeys int) *Cmd {
lf := libName + "." + funcName
args := []interface{}{"TFCALL", lf, numKeys}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) TFCallArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd {
lf := libName + "." + funcName
args := []interface{}{"TFCALL", lf, numKeys}
if options != nil {
for _, key := range options.Keys {
args = append(args, key)
}
for _, key := range options.Arguments {
args = append(args, key)
}
}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// TFCallASYNC - invoke an asynchronous JavaScript function (coroutine).
// For more information - https://redis.io/commands/TFCallASYNC/
func (c cmdable) TFCallASYNC(ctx context.Context, libName string, funcName string, numKeys int) *Cmd {
lf := fmt.Sprintf("%s.%s", libName, funcName)
args := []interface{}{"TFCALLASYNC", lf, numKeys}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) TFCallASYNCArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd {
lf := fmt.Sprintf("%s.%s", libName, funcName)
args := []interface{}{"TFCALLASYNC", lf, numKeys}
if options != nil {
for _, key := range options.Keys {
args = append(args, key)
}
for _, key := range options.Arguments {
args = append(args, key)
}
}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

View File

@ -1,121 +0,0 @@
package redis_test
import (
"context"
"fmt"
. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
"github.com/redis/go-redis/v9"
)
func libCode(libName string) string {
return fmt.Sprintf("#!js api_version=1.0 name=%s\n redis.registerFunction('foo', ()=>{{return 'bar'}})", libName)
}
func libCodeWithConfig(libName string) string {
lib := `#!js api_version=1.0 name=%s
var last_update_field_name = "__last_update__"
if (redis.config.last_update_field_name !== undefined) {
if (typeof redis.config.last_update_field_name != 'string') {
throw "last_update_field_name must be a string";
}
last_update_field_name = redis.config.last_update_field_name
}
redis.registerFunction("hset", function(client, key, field, val){
// get the current time in ms
var curr_time = client.call("time")[0];
return client.call('hset', key, field, val, last_update_field_name, curr_time);
});`
return fmt.Sprintf(lib, libName)
}
// TODO: Drop Gears
var _ = Describe("RedisGears commands", Label("gears"), func() {
ctx := context.TODO()
var client *redis.Client
BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379"})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
client.TFunctionDelete(ctx, "lib1")
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should TFunctionLoad, TFunctionLoadArgs and TFunctionDelete ", Label("gears", "tfunctionload"), func() {
SkipAfterRedisVersion(7.4, "gears are not working in later versions")
resultAdd, err := client.TFunctionLoad(ctx, libCode("lib1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFunctionLoadOptions{Replace: true, Config: `{"last_update_field_name":"last_update"}`}
resultAdd, err = client.TFunctionLoadArgs(ctx, libCodeWithConfig("lib1"), opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})
It("should TFunctionList", Label("gears", "tfunctionlist"), func() {
SkipAfterRedisVersion(7.4, "gears are not working in later versions")
resultAdd, err := client.TFunctionLoad(ctx, libCode("lib1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultList, err := client.TFunctionList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultList[0]["engine"]).To(BeEquivalentTo("js"))
opt := &redis.TFunctionListOptions{Withcode: true, Verbose: 2}
resultListArgs, err := client.TFunctionListArgs(ctx, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultListArgs[0]["code"]).NotTo(BeEquivalentTo(""))
})
It("should TFCall", Label("gears", "tfcall"), func() {
SkipAfterRedisVersion(7.4, "gears are not working in later versions")
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("lib1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFCall(ctx, "lib1", "foo", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
})
It("should TFCallArgs", Label("gears", "tfcallargs"), func() {
SkipAfterRedisVersion(7.4, "gears are not working in later versions")
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("lib1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFCallOptions{Arguments: []string{"foo", "bar"}}
resultAdd, err = client.TFCallArgs(ctx, "lib1", "foo", 0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
})
It("should TFCallASYNC", Label("gears", "TFCallASYNC"), func() {
SkipAfterRedisVersion(7.4, "gears are not working in later versions")
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("lib1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFCallASYNC(ctx, "lib1", "foo", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
})
It("should TFCallASYNCArgs", Label("gears", "TFCallASYNCargs"), func() {
SkipAfterRedisVersion(7.4, "gears are not working in later versions")
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("lib1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFCallOptions{Arguments: []string{"foo", "bar"}}
resultAdd, err = client.TFCallASYNCArgs(ctx, "lib1", "foo", 0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
})
})

1
go.mod
View File

@ -10,6 +10,7 @@ require (
)
retract (
v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead.
v9.5.4 // This version was accidentally released. Please use version 9.6.0 instead.
v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead.
)

View File

@ -10,6 +10,9 @@ type HashCmdable interface {
HExists(ctx context.Context, key, field string) *BoolCmd
HGet(ctx context.Context, key, field string) *StringCmd
HGetAll(ctx context.Context, key string) *MapStringStringCmd
HGetDel(ctx context.Context, key string, fields ...string) *StringSliceCmd
HGetEX(ctx context.Context, key string, fields ...string) *StringSliceCmd
HGetEXWithArgs(ctx context.Context, key string, options *HGetEXOptions, fields ...string) *StringSliceCmd
HIncrBy(ctx context.Context, key, field string, incr int64) *IntCmd
HIncrByFloat(ctx context.Context, key, field string, incr float64) *FloatCmd
HKeys(ctx context.Context, key string) *StringSliceCmd
@ -17,6 +20,8 @@ type HashCmdable interface {
HMGet(ctx context.Context, key string, fields ...string) *SliceCmd
HSet(ctx context.Context, key string, values ...interface{}) *IntCmd
HMSet(ctx context.Context, key string, values ...interface{}) *BoolCmd
HSetEX(ctx context.Context, key string, fieldsAndValues ...string) *IntCmd
HSetEXWithArgs(ctx context.Context, key string, options *HSetEXOptions, fieldsAndValues ...string) *IntCmd
HSetNX(ctx context.Context, key, field string, value interface{}) *BoolCmd
HScan(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd
HScanNoValues(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd
@ -219,7 +224,10 @@ type HExpireArgs struct {
// HExpire - Sets the expiration time for specified fields in a hash in seconds.
// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
// For more information - https://redis.io/commands/hexpire/
// Available since Redis 7.4 CE.
// For more information refer to [HEXPIRE Documentation].
//
// [HEXPIRE Documentation]: https://redis.io/commands/hexpire/
func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIRE", key, formatSec(ctx, expiration), "FIELDS", len(fields)}
@ -234,7 +242,10 @@ func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Durati
// HExpireWithArgs - Sets the expiration time for specified fields in a hash in seconds.
// It requires a key, an expiration duration, a struct with boolean flags for conditional expiration settings (NX, XX, GT, LT), and a list of fields.
// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
// For more information - https://redis.io/commands/hexpire/
// Available since Redis 7.4 CE.
// For more information refer to [HEXPIRE Documentation].
//
// [HEXPIRE Documentation]: https://redis.io/commands/hexpire/
func (c cmdable) HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIRE", key, formatSec(ctx, expiration)}
@ -263,7 +274,10 @@ func (c cmdable) HExpireWithArgs(ctx context.Context, key string, expiration tim
// HPExpire - Sets the expiration time for specified fields in a hash in milliseconds.
// Similar to HExpire, it accepts a key, an expiration duration in milliseconds, a struct with expiration condition flags, and a list of fields.
// The command modifies the standard time.Duration to milliseconds for the Redis command.
// For more information - https://redis.io/commands/hpexpire/
// Available since Redis 7.4 CE.
// For more information refer to [HPEXPIRE Documentation].
//
// [HPEXPIRE Documentation]: https://redis.io/commands/hpexpire/
func (c cmdable) HPExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd {
args := []interface{}{"HPEXPIRE", key, formatMs(ctx, expiration), "FIELDS", len(fields)}
@ -275,6 +289,13 @@ func (c cmdable) HPExpire(ctx context.Context, key string, expiration time.Durat
return cmd
}
// HPExpireWithArgs - Sets the expiration time for specified fields in a hash in milliseconds.
// It requires a key, an expiration duration, a struct with boolean flags for conditional expiration settings (NX, XX, GT, LT), and a list of fields.
// The command constructs an argument list starting with "HPEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
// Available since Redis 7.4 CE.
// For more information refer to [HPEXPIRE Documentation].
//
// [HPEXPIRE Documentation]: https://redis.io/commands/hpexpire/
func (c cmdable) HPExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
args := []interface{}{"HPEXPIRE", key, formatMs(ctx, expiration)}
@ -303,7 +324,10 @@ func (c cmdable) HPExpireWithArgs(ctx context.Context, key string, expiration ti
// HExpireAt - Sets the expiration time for specified fields in a hash to a UNIX timestamp in seconds.
// Takes a key, a UNIX timestamp, a struct of conditional flags, and a list of fields.
// The command sets absolute expiration times based on the UNIX timestamp provided.
// For more information - https://redis.io/commands/hexpireat/
// Available since Redis 7.4 CE.
// For more information refer to [HExpireAt Documentation].
//
// [HExpireAt Documentation]: https://redis.io/commands/hexpireat/
func (c cmdable) HExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIREAT", key, tm.Unix(), "FIELDS", len(fields)}
@ -343,7 +367,10 @@ func (c cmdable) HExpireAtWithArgs(ctx context.Context, key string, tm time.Time
// HPExpireAt - Sets the expiration time for specified fields in a hash to a UNIX timestamp in milliseconds.
// Similar to HExpireAt but for timestamps in milliseconds. It accepts the same parameters and adjusts the UNIX time to milliseconds.
// For more information - https://redis.io/commands/hpexpireat/
// Available since Redis 7.4 CE.
// For more information refer to [HExpireAt Documentation].
//
// [HExpireAt Documentation]: https://redis.io/commands/hexpireat/
func (c cmdable) HPExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd {
args := []interface{}{"HPEXPIREAT", key, tm.UnixNano() / int64(time.Millisecond), "FIELDS", len(fields)}
@ -383,7 +410,10 @@ func (c cmdable) HPExpireAtWithArgs(ctx context.Context, key string, tm time.Tim
// HPersist - Removes the expiration time from specified fields in a hash.
// Accepts a key and the fields themselves.
// This command ensures that each field specified will have its expiration removed if present.
// For more information - https://redis.io/commands/hpersist/
// Available since Redis 7.4 CE.
// For more information refer to [HPersist Documentation].
//
// [HPersist Documentation]: https://redis.io/commands/hpersist/
func (c cmdable) HPersist(ctx context.Context, key string, fields ...string) *IntSliceCmd {
args := []interface{}{"HPERSIST", key, "FIELDS", len(fields)}
@ -398,6 +428,10 @@ func (c cmdable) HPersist(ctx context.Context, key string, fields ...string) *In
// HExpireTime - Retrieves the expiration time for specified fields in a hash as a UNIX timestamp in seconds.
// Requires a key and the fields themselves to fetch their expiration timestamps.
// This command returns the expiration times for each field or error/status codes for each field as specified.
// Available since Redis 7.4 CE.
// For more information refer to [HExpireTime Documentation].
//
// [HExpireTime Documentation]: https://redis.io/commands/hexpiretime/
// For more information - https://redis.io/commands/hexpiretime/
func (c cmdable) HExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIRETIME", key, "FIELDS", len(fields)}
@ -413,6 +447,10 @@ func (c cmdable) HExpireTime(ctx context.Context, key string, fields ...string)
// HPExpireTime - Retrieves the expiration time for specified fields in a hash as a UNIX timestamp in milliseconds.
// Similar to HExpireTime, adjusted for timestamps in milliseconds. It requires the same parameters.
// Provides the expiration timestamp for each field in milliseconds.
// Available since Redis 7.4 CE.
// For more information refer to [HExpireTime Documentation].
//
// [HExpireTime Documentation]: https://redis.io/commands/hexpiretime/
// For more information - https://redis.io/commands/hexpiretime/
func (c cmdable) HPExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd {
args := []interface{}{"HPEXPIRETIME", key, "FIELDS", len(fields)}
@ -428,7 +466,10 @@ func (c cmdable) HPExpireTime(ctx context.Context, key string, fields ...string)
// HTTL - Retrieves the remaining time to live for specified fields in a hash in seconds.
// Requires a key and the fields themselves. It returns the TTL for each specified field.
// This command fetches the TTL in seconds for each field or returns error/status codes as appropriate.
// For more information - https://redis.io/commands/httl/
// Available since Redis 7.4 CE.
// For more information refer to [HTTL Documentation].
//
// [HTTL Documentation]: https://redis.io/commands/httl/
func (c cmdable) HTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd {
args := []interface{}{"HTTL", key, "FIELDS", len(fields)}
@ -443,6 +484,10 @@ func (c cmdable) HTTL(ctx context.Context, key string, fields ...string) *IntSli
// HPTTL - Retrieves the remaining time to live for specified fields in a hash in milliseconds.
// Similar to HTTL, but returns the TTL in milliseconds. It requires a key and the specified fields.
// This command provides the TTL in milliseconds for each field or returns error/status codes as needed.
// Available since Redis 7.4 CE.
// For more information refer to [HPTTL Documentation].
//
// [HPTTL Documentation]: https://redis.io/commands/hpttl/
// For more information - https://redis.io/commands/hpttl/
func (c cmdable) HPTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd {
args := []interface{}{"HPTTL", key, "FIELDS", len(fields)}
@ -454,3 +499,113 @@ func (c cmdable) HPTTL(ctx context.Context, key string, fields ...string) *IntSl
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) HGetDel(ctx context.Context, key string, fields ...string) *StringSliceCmd {
args := []interface{}{"HGETDEL", key, "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewStringSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) HGetEX(ctx context.Context, key string, fields ...string) *StringSliceCmd {
args := []interface{}{"HGETEX", key, "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
}
cmd := NewStringSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// HGetEXExpirationType represents an expiration option for the HGETEX command.
type HGetEXExpirationType string
const (
HGetEXExpirationEX HGetEXExpirationType = "EX"
HGetEXExpirationPX HGetEXExpirationType = "PX"
HGetEXExpirationEXAT HGetEXExpirationType = "EXAT"
HGetEXExpirationPXAT HGetEXExpirationType = "PXAT"
HGetEXExpirationPERSIST HGetEXExpirationType = "PERSIST"
)
type HGetEXOptions struct {
ExpirationType HGetEXExpirationType
ExpirationVal int64
}
func (c cmdable) HGetEXWithArgs(ctx context.Context, key string, options *HGetEXOptions, fields ...string) *StringSliceCmd {
args := []interface{}{"HGETEX", key}
if options.ExpirationType != "" {
args = append(args, string(options.ExpirationType))
if options.ExpirationType != HGetEXExpirationPERSIST {
args = append(args, options.ExpirationVal)
}
}
args = append(args, "FIELDS", len(fields))
for _, field := range fields {
args = append(args, field)
}
cmd := NewStringSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
type HSetEXCondition string
const (
HSetEXFNX HSetEXCondition = "FNX" // Only set the fields if none of them already exist.
HSetEXFXX HSetEXCondition = "FXX" // Only set the fields if all already exist.
)
type HSetEXExpirationType string
const (
HSetEXExpirationEX HSetEXExpirationType = "EX"
HSetEXExpirationPX HSetEXExpirationType = "PX"
HSetEXExpirationEXAT HSetEXExpirationType = "EXAT"
HSetEXExpirationPXAT HSetEXExpirationType = "PXAT"
HSetEXExpirationKEEPTTL HSetEXExpirationType = "KEEPTTL"
)
type HSetEXOptions struct {
Condition HSetEXCondition
ExpirationType HSetEXExpirationType
ExpirationVal int64
}
func (c cmdable) HSetEX(ctx context.Context, key string, fieldsAndValues ...string) *IntCmd {
args := []interface{}{"HSETEX", key, "FIELDS", len(fieldsAndValues) / 2}
for _, field := range fieldsAndValues {
args = append(args, field)
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) HSetEXWithArgs(ctx context.Context, key string, options *HSetEXOptions, fieldsAndValues ...string) *IntCmd {
args := []interface{}{"HSETEX", key}
if options.Condition != "" {
args = append(args, string(options.Condition))
}
if options.ExpirationType != "" {
args = append(args, string(options.ExpirationType))
if options.ExpirationType != HSetEXExpirationKEEPTTL {
args = append(args, options.ExpirationVal)
}
}
args = append(args, "FIELDS", len(fieldsAndValues)/2)
for _, field := range fieldsAndValues {
args = append(args, field)
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

View File

@ -33,6 +33,7 @@ func BenchmarkPoolGetPut(b *testing.B) {
Dialer: dummyDialer,
PoolSize: bm.poolSize,
PoolTimeout: time.Second,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Hour,
})
@ -76,6 +77,7 @@ func BenchmarkPoolGetRemove(b *testing.B) {
Dialer: dummyDialer,
PoolSize: bm.poolSize,
PoolTimeout: time.Second,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Hour,
})

View File

@ -62,6 +62,7 @@ type Options struct {
PoolFIFO bool
PoolSize int
DialTimeout time.Duration
PoolTimeout time.Duration
MinIdleConns int
MaxIdleConns int
@ -140,7 +141,10 @@ func (p *ConnPool) checkMinIdleConns() {
}
func (p *ConnPool) addIdleConn() error {
cn, err := p.dialConn(context.TODO(), true)
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
defer cancel()
cn, err := p.dialConn(ctx, true)
if err != nil {
return err
}
@ -230,15 +234,19 @@ func (p *ConnPool) tryDial() {
return
}
conn, err := p.cfg.Dialer(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
conn, err := p.cfg.Dialer(ctx)
if err != nil {
p.setLastDialError(err)
time.Sleep(time.Second)
cancel()
continue
}
atomic.StoreUint32(&p.dialErrorsNum, 0)
_ = conn.Close()
cancel()
return
}
}

View File

@ -22,6 +22,7 @@ var _ = Describe("ConnPool", func() {
Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Hour,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Millisecond,
})
})
@ -46,6 +47,7 @@ var _ = Describe("ConnPool", func() {
},
PoolSize: 10,
PoolTimeout: time.Hour,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Millisecond,
MinIdleConns: minIdleConns,
})
@ -129,6 +131,7 @@ var _ = Describe("MinIdleConns", func() {
PoolSize: poolSize,
MinIdleConns: minIdleConns,
PoolTimeout: 100 * time.Millisecond,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: -1,
})
Eventually(func() int {
@ -306,6 +309,7 @@ var _ = Describe("race", func() {
Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Minute,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Millisecond,
})
@ -336,6 +340,7 @@ var _ = Describe("race", func() {
PoolSize: 1000,
MinIdleConns: 50,
PoolTimeout: 3 * time.Second,
DialTimeout: 1 * time.Second,
}
p := pool.NewConnPool(opt)

View File

@ -352,3 +352,27 @@ var _ = Describe("withConn", func() {
Expect(client.connPool.Len()).To(Equal(1))
})
})
var _ = Describe("ClusterClient", func() {
var client *ClusterClient
BeforeEach(func() {
client = &ClusterClient{}
})
Describe("cmdSlot", func() {
It("select slot from args for GETKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200)
slot := client.cmdSlot(context.Background(), cmd)
Expect(slot).To(Equal(100))
})
It("select slot from args for COUNTKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100)
slot := client.cmdSlot(context.Background(), cmd)
Expect(slot).To(Equal(100))
})
})
})

View File

@ -148,9 +148,18 @@ type Options struct {
// Enables read only queries on slave/follower nodes.
readOnly bool
// Disable set-lib on connect. Default is false.
// DisableIndentity - Disable set-lib on connect.
//
// default: false
//
// Deprecated: Use DisableIdentity instead.
DisableIndentity bool
// DisableIdentity is used to disable CLIENT SETINFO command on connect.
//
// default: false
DisableIdentity bool
// Add suffix to client name. Default is empty.
IdentitySuffix string
@ -522,6 +531,7 @@ func newConnPool(
PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
DialTimeout: opt.DialTimeout,
MinIdleConns: opt.MinIdleConns,
MaxIdleConns: opt.MaxIdleConns,
MaxActiveConns: opt.MaxActiveConns,

View File

@ -90,8 +90,19 @@ type ClusterOptions struct {
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
TLSConfig *tls.Config
DisableIndentity bool // Disable set-lib on connect. Default is false.
TLSConfig *tls.Config
// DisableIndentity - Disable set-lib on connect.
//
// default: false
//
// Deprecated: Use DisableIdentity instead.
DisableIndentity bool
// DisableIdentity is used to disable CLIENT SETINFO command on connect.
//
// default: false
DisableIdentity bool
IdentitySuffix string // Add suffix to client name. Default is empty.
@ -303,7 +314,8 @@ func (opt *ClusterOptions) clientOptions() *Options {
MaxActiveConns: opt.MaxActiveConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
DisableIndentity: opt.DisableIndentity,
DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIdentity,
IdentitySuffix: opt.IdentitySuffix,
TLSConfig: opt.TLSConfig,
// If ClusterSlots is populated, then we probably have an artificial
@ -1844,7 +1856,7 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
func (c *ClusterClient) cmdSlot(ctx context.Context, cmd Cmder) int {
args := cmd.Args()
if args[0] == "cluster" && args[1] == "getkeysinslot" {
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
return args[2].(int)
}

View File

@ -432,7 +432,7 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int
return nil, err
}
err = cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error {
err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error {
return c.cmd.readReply(rd)
})

View File

@ -310,7 +310,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
// for redis-server versions that do not support the HELLO command,
// RESP2 will continue to be used.
if err = conn.Hello(ctx, protocol, username, password, "").Err(); err == nil {
if err = conn.Hello(ctx, protocol, username, password, c.opt.ClientName).Err(); err == nil {
auth = true
} else if !isRedisError(err) {
// When the server responds with the RESP protocol and the result is not a normal
@ -350,7 +350,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
return err
}
if !c.opt.DisableIndentity {
if !c.opt.DisableIdentity && !c.opt.DisableIndentity {
libName := ""
libVer := Version()
if c.opt.IdentitySuffix != "" {
@ -359,7 +359,11 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
p := conn.Pipeline()
p.ClientSetInfo(ctx, WithLibraryName(libName))
p.ClientSetInfo(ctx, WithLibraryVersion(libVer))
_, _ = p.Exec(ctx)
// Handle network errors (e.g. timeouts) in CLIENT SETINFO to avoid
// out of order responses later on.
if _, err = p.Exec(ctx); err != nil && !isRedisError(err) {
return err
}
}
if c.opt.OnConnect != nil {

View File

@ -186,6 +186,32 @@ var _ = Describe("Client", func() {
Expect(val).Should(ContainSubstring("name=hi"))
})
It("should attempt to set client name in HELLO", func() {
opt := redisOptions()
opt.ClientName = "hi"
db := redis.NewClient(opt)
defer func() {
Expect(db.Close()).NotTo(HaveOccurred())
}()
// Client name should be already set on any successfully initialized connection
name, err := db.ClientGetName(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(name).Should(Equal("hi"))
// HELLO should be able to explicitly overwrite the client name
conn := db.Conn()
hello, err := conn.Hello(ctx, 3, "", "", "hi2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(hello["proto"]).Should(Equal(int64(3)))
name, err = conn.ClientGetName(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(name).Should(Equal("hi2"))
err = conn.Close()
Expect(err).NotTo(HaveOccurred())
})
It("should client PROTO 2", func() {
opt := redisOptions()
opt.Protocol = 2
@ -370,6 +396,13 @@ var _ = Describe("Client timeout", func() {
})
testTimeout := func() {
It("SETINFO timeouts", func() {
conn := client.Conn()
err := conn.Ping(ctx).Err()
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
It("Ping timeouts", func() {
err := client.Ping(ctx).Err()
Expect(err).To(HaveOccurred())

20
ring.go
View File

@ -98,9 +98,19 @@ type RingOptions struct {
TLSConfig *tls.Config
Limiter Limiter
// DisableIndentity - Disable set-lib on connect.
//
// default: false
//
// Deprecated: Use DisableIdentity instead.
DisableIndentity bool
IdentitySuffix string
UnstableResp3 bool
// DisableIdentity is used to disable CLIENT SETINFO command on connect.
//
// default: false
DisableIdentity bool
IdentitySuffix string
UnstableResp3 bool
}
func (opt *RingOptions) init() {
@ -167,9 +177,11 @@ func (opt *RingOptions) clientOptions() *Options {
TLSConfig: opt.TLSConfig,
Limiter: opt.Limiter,
DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIndentity,
IdentitySuffix: opt.IdentitySuffix,
UnstableResp3: opt.UnstableResp3,
IdentitySuffix: opt.IdentitySuffix,
UnstableResp3: opt.UnstableResp3,
}
}

View File

@ -114,6 +114,7 @@ type SpellCheckTerms struct {
}
type FTExplainOptions struct {
// Dialect 1,3 and 4 are deprecated since redis 8.0
Dialect string
}
@ -261,7 +262,8 @@ type FTAggregateOptions struct {
WithCursor bool
WithCursorOptions *FTAggregateWithCursor
Params map[string]interface{}
DialectVersion int
// Dialect 1,3 and 4 are deprecated since redis 8.0
DialectVersion int
}
type FTSearchFilter struct {
@ -320,8 +322,12 @@ type FTSearchOptions struct {
SortByWithCount bool
LimitOffset int
Limit int
Params map[string]interface{}
DialectVersion int
// CountOnly sets LIMIT 0 0 to get the count - number of documents in the result set without actually returning the result set.
// When using this option, the Limit and LimitOffset options are ignored.
CountOnly bool
Params map[string]interface{}
// Dialect 1,3 and 4 are deprecated since redis 8.0
DialectVersion int
}
type FTSynDumpResult struct {
@ -437,7 +443,8 @@ type IndexDefinition struct {
type FTSpellCheckOptions struct {
Distance int
Terms *FTSpellCheckTerms
Dialect int
// Dialect 1,3 and 4 are deprecated since redis 8.0
Dialect int
}
type FTSpellCheckTerms struct {
@ -604,6 +611,8 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
if options.DialectVersion > 0 {
queryArgs = append(queryArgs, "DIALECT", options.DialectVersion)
} else {
queryArgs = append(queryArgs, "DIALECT", 2)
}
}
return queryArgs
@ -801,6 +810,8 @@ func (c cmdable) FTAggregateWithArgs(ctx context.Context, index string, query st
}
if options.DialectVersion > 0 {
args = append(args, "DIALECT", options.DialectVersion)
} else {
args = append(args, "DIALECT", 2)
}
}
@ -1174,6 +1185,8 @@ func (c cmdable) FTExplainWithArgs(ctx context.Context, index string, query stri
args := []interface{}{"FT.EXPLAIN", index, query}
if options.Dialect != "" {
args = append(args, "DIALECT", options.Dialect)
} else {
args = append(args, "DIALECT", 2)
}
cmd := NewStringCmd(ctx, args...)
_ = c(ctx, cmd)
@ -1471,6 +1484,8 @@ func (c cmdable) FTSpellCheckWithArgs(ctx context.Context, index string, query s
}
if options.Dialect > 0 {
args = append(args, "DIALECT", options.Dialect)
} else {
args = append(args, "DIALECT", 2)
}
}
cmd := newFTSpellCheckCmd(ctx, args...)
@ -1840,6 +1855,8 @@ func FTSearchQuery(query string, options *FTSearchOptions) SearchQuery {
}
if options.DialectVersion > 0 {
queryArgs = append(queryArgs, "DIALECT", options.DialectVersion)
} else {
queryArgs = append(queryArgs, "DIALECT", 2)
}
}
return queryArgs
@ -1944,8 +1961,12 @@ func (c cmdable) FTSearchWithArgs(ctx context.Context, index string, query strin
args = append(args, "WITHCOUNT")
}
}
if options.LimitOffset >= 0 && options.Limit > 0 {
args = append(args, "LIMIT", options.LimitOffset, options.Limit)
if options.CountOnly {
args = append(args, "LIMIT", 0, 0)
} else {
if options.LimitOffset >= 0 && options.Limit > 0 || options.LimitOffset > 0 && options.Limit == 0 {
args = append(args, "LIMIT", options.LimitOffset, options.Limit)
}
}
if options.Params != nil {
args = append(args, "PARAMS", len(options.Params)*2)
@ -1955,6 +1976,8 @@ func (c cmdable) FTSearchWithArgs(ctx context.Context, index string, query strin
}
if options.DialectVersion > 0 {
args = append(args, "DIALECT", options.DialectVersion)
} else {
args = append(args, "DIALECT", 2)
}
}
cmd := newFTSearchCmd(ctx, options, args...)
@ -2078,216 +2101,3 @@ func (c cmdable) FTTagVals(ctx context.Context, index string, field string) *Str
_ = c(ctx, cmd)
return cmd
}
// TODO: remove FTProfile
// type FTProfileResult struct {
// Results []interface{}
// Profile ProfileDetails
// }
// type ProfileDetails struct {
// TotalProfileTime string
// ParsingTime string
// PipelineCreationTime string
// Warning string
// IteratorsProfile []IteratorProfile
// ResultProcessorsProfile []ResultProcessorProfile
// }
// type IteratorProfile struct {
// Type string
// QueryType string
// Time interface{}
// Counter int
// Term string
// Size int
// ChildIterators []IteratorProfile
// }
// type ResultProcessorProfile struct {
// Type string
// Time interface{}
// Counter int
// }
// func parseFTProfileResult(data []interface{}) (FTProfileResult, error) {
// var result FTProfileResult
// if len(data) < 2 {
// return result, fmt.Errorf("unexpected data length")
// }
// // Parse results
// result.Results = data[0].([]interface{})
// // Parse profile details
// profileData := data[1].([]interface{})
// profileDetails := ProfileDetails{}
// for i := 0; i < len(profileData); i += 2 {
// switch profileData[i].(string) {
// case "Total profile time":
// profileDetails.TotalProfileTime = profileData[i+1].(string)
// case "Parsing time":
// profileDetails.ParsingTime = profileData[i+1].(string)
// case "Pipeline creation time":
// profileDetails.PipelineCreationTime = profileData[i+1].(string)
// case "Warning":
// profileDetails.Warning = profileData[i+1].(string)
// case "Iterators profile":
// profileDetails.IteratorsProfile = parseIteratorsProfile(profileData[i+1].([]interface{}))
// case "Result processors profile":
// profileDetails.ResultProcessorsProfile = parseResultProcessorsProfile(profileData[i+1].([]interface{}))
// }
// }
// result.Profile = profileDetails
// return result, nil
// }
// func parseIteratorsProfile(data []interface{}) []IteratorProfile {
// var iterators []IteratorProfile
// for _, item := range data {
// profile := item.([]interface{})
// iterator := IteratorProfile{}
// for i := 0; i < len(profile); i += 2 {
// switch profile[i].(string) {
// case "Type":
// iterator.Type = profile[i+1].(string)
// case "Query type":
// iterator.QueryType = profile[i+1].(string)
// case "Time":
// iterator.Time = profile[i+1]
// case "Counter":
// iterator.Counter = int(profile[i+1].(int64))
// case "Term":
// iterator.Term = profile[i+1].(string)
// case "Size":
// iterator.Size = int(profile[i+1].(int64))
// case "Child iterators":
// iterator.ChildIterators = parseChildIteratorsProfile(profile[i+1].([]interface{}))
// }
// }
// iterators = append(iterators, iterator)
// }
// return iterators
// }
// func parseChildIteratorsProfile(data []interface{}) []IteratorProfile {
// var iterators []IteratorProfile
// for _, item := range data {
// profile := item.([]interface{})
// iterator := IteratorProfile{}
// for i := 0; i < len(profile); i += 2 {
// switch profile[i].(string) {
// case "Type":
// iterator.Type = profile[i+1].(string)
// case "Query type":
// iterator.QueryType = profile[i+1].(string)
// case "Time":
// iterator.Time = profile[i+1]
// case "Counter":
// iterator.Counter = int(profile[i+1].(int64))
// case "Term":
// iterator.Term = profile[i+1].(string)
// case "Size":
// iterator.Size = int(profile[i+1].(int64))
// }
// }
// iterators = append(iterators, iterator)
// }
// return iterators
// }
// func parseResultProcessorsProfile(data []interface{}) []ResultProcessorProfile {
// var processors []ResultProcessorProfile
// for _, item := range data {
// profile := item.([]interface{})
// processor := ResultProcessorProfile{}
// for i := 0; i < len(profile); i += 2 {
// switch profile[i].(string) {
// case "Type":
// processor.Type = profile[i+1].(string)
// case "Time":
// processor.Time = profile[i+1]
// case "Counter":
// processor.Counter = int(profile[i+1].(int64))
// }
// }
// processors = append(processors, processor)
// }
// return processors
// }
// func NewFTProfileCmd(ctx context.Context, args ...interface{}) *FTProfileCmd {
// return &FTProfileCmd{
// baseCmd: baseCmd{
// ctx: ctx,
// args: args,
// },
// }
// }
// type FTProfileCmd struct {
// baseCmd
// val FTProfileResult
// }
// func (cmd *FTProfileCmd) String() string {
// return cmdString(cmd, cmd.val)
// }
// func (cmd *FTProfileCmd) SetVal(val FTProfileResult) {
// cmd.val = val
// }
// func (cmd *FTProfileCmd) Result() (FTProfileResult, error) {
// return cmd.val, cmd.err
// }
// func (cmd *FTProfileCmd) Val() FTProfileResult {
// return cmd.val
// }
// func (cmd *FTProfileCmd) readReply(rd *proto.Reader) (err error) {
// data, err := rd.ReadSlice()
// if err != nil {
// return err
// }
// cmd.val, err = parseFTProfileResult(data)
// if err != nil {
// cmd.err = err
// }
// return nil
// }
// // FTProfile - Executes a search query and returns a profile of how the query was processed.
// // The 'index' parameter specifies the index to search, the 'limited' parameter specifies whether to limit the results,
// // and the 'query' parameter specifies the search / aggreagte query. Please notice that you must either pass a SearchQuery or an AggregateQuery.
// // For more information, please refer to the Redis documentation:
// // [FT.PROFILE]: (https://redis.io/commands/ft.profile/)
// func (c cmdable) FTProfile(ctx context.Context, index string, limited bool, query interface{}) *FTProfileCmd {
// queryType := ""
// var argsQuery []interface{}
// switch v := query.(type) {
// case AggregateQuery:
// queryType = "AGGREGATE"
// argsQuery = v
// case SearchQuery:
// queryType = "SEARCH"
// argsQuery = v
// default:
// panic("FT.PROFILE: query must be either AggregateQuery or SearchQuery")
// }
// args := []interface{}{"FT.PROFILE", index, queryType}
// if limited {
// args = append(args, "LIMITED")
// }
// args = append(args, "QUERY")
// args = append(args, argsQuery...)
// cmd := NewFTProfileCmd(ctx, args...)
// _ = c(ctx, cmd)
// return cmd
// }

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"
. "github.com/bsm/ginkgo/v2"
@ -381,7 +382,7 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
// up until redis 8 the default scorer was TFIDF, in redis 8 it is BM25
// this test expect redis major version >= 8
It("should FTSearch WithScores", Label("search", "ftsearch"), func() {
SkipBeforeRedisVersion(7.9, "default scorer is not BM25")
SkipBeforeRedisVersion(7.9, "default scorer is not BM25STD")
text1 := &redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText}
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, text1).Result()
@ -1143,6 +1144,55 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
Expect(res.Docs[0].Fields["__v_score"]).To(BeEquivalentTo("0"))
})
It("should FTCreate VECTOR with dialect 1 ", Label("search", "ftcreate"), func() {
hnswOptions := &redis.FTHNSWOptions{Type: "FLOAT32", Dim: 2, DistanceMetric: "L2"}
val, err := client.FTCreate(ctx, "idx1",
&redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "v", FieldType: redis.SearchFieldTypeVector, VectorArgs: &redis.FTVectorArgs{HNSWOptions: hnswOptions}}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
client.HSet(ctx, "a", "v", "aaaaaaaa")
client.HSet(ctx, "b", "v", "aaaabaaa")
client.HSet(ctx, "c", "v", "aaaaabaa")
searchOptions := &redis.FTSearchOptions{
Return: []redis.FTSearchReturn{{FieldName: "v"}},
SortBy: []redis.FTSearchSortBy{{FieldName: "v", Asc: true}},
Limit: 10,
DialectVersion: 1,
}
res, err := client.FTSearchWithArgs(ctx, "idx1", "*", searchOptions).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("a"))
Expect(res.Docs[0].Fields["v"]).To(BeEquivalentTo("aaaaaaaa"))
})
It("should FTCreate VECTOR with default dialect", Label("search", "ftcreate"), func() {
hnswOptions := &redis.FTHNSWOptions{Type: "FLOAT32", Dim: 2, DistanceMetric: "L2"}
val, err := client.FTCreate(ctx, "idx1",
&redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "v", FieldType: redis.SearchFieldTypeVector, VectorArgs: &redis.FTVectorArgs{HNSWOptions: hnswOptions}}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
client.HSet(ctx, "a", "v", "aaaaaaaa")
client.HSet(ctx, "b", "v", "aaaabaaa")
client.HSet(ctx, "c", "v", "aaaaabaa")
searchOptions := &redis.FTSearchOptions{
Return: []redis.FTSearchReturn{{FieldName: "__v_score"}},
SortBy: []redis.FTSearchSortBy{{FieldName: "__v_score", Asc: true}},
Params: map[string]interface{}{"vec": "aaaaaaaa"},
}
res, err := client.FTSearchWithArgs(ctx, "idx1", "*=>[KNN 2 @v $vec]", searchOptions).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("a"))
Expect(res.Docs[0].Fields["__v_score"]).To(BeEquivalentTo("0"))
})
It("should FTCreate and FTSearch text params", Label("search", "ftcreate", "ftsearch"), func() {
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{FieldName: "name", FieldType: redis.SearchFieldTypeText}).Result()
Expect(err).NotTo(HaveOccurred())
@ -1577,6 +1627,577 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
Expect(res.Docs[0].ID).To(BeEquivalentTo("property:1"))
Expect(res.Docs[1].ID).To(BeEquivalentTo("property:2"))
})
It("should FTCreate VECTOR with int8 and uint8 types", Label("search", "ftcreate"), func() {
SkipBeforeRedisVersion(7.9, "doesn't work with older redis")
// Define INT8 vector field
hnswOptionsInt8 := &redis.FTHNSWOptions{
Type: "INT8",
Dim: 2,
DistanceMetric: "L2",
}
// Define UINT8 vector field
hnswOptionsUint8 := &redis.FTHNSWOptions{
Type: "UINT8",
Dim: 2,
DistanceMetric: "L2",
}
// Create index with INT8 and UINT8 vector fields
val, err := client.FTCreate(ctx, "idx1",
&redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "int8_vector", FieldType: redis.SearchFieldTypeVector, VectorArgs: &redis.FTVectorArgs{HNSWOptions: hnswOptionsInt8}},
&redis.FieldSchema{FieldName: "uint8_vector", FieldType: redis.SearchFieldTypeVector, VectorArgs: &redis.FTVectorArgs{HNSWOptions: hnswOptionsUint8}},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
// Insert vectors in int8 and uint8 format
client.HSet(ctx, "doc1", "int8_vector", "\x01\x02", "uint8_vector", "\x01\x02")
client.HSet(ctx, "doc2", "int8_vector", "\x03\x04", "uint8_vector", "\x03\x04")
// Perform KNN search on INT8 vector
searchOptionsInt8 := &redis.FTSearchOptions{
Return: []redis.FTSearchReturn{{FieldName: "int8_vector"}},
SortBy: []redis.FTSearchSortBy{{FieldName: "int8_vector", Asc: true}},
DialectVersion: 2,
Params: map[string]interface{}{"vec": "\x01\x02"},
}
resInt8, err := client.FTSearchWithArgs(ctx, "idx1", "*=>[KNN 1 @int8_vector $vec]", searchOptionsInt8).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resInt8.Docs[0].ID).To(BeEquivalentTo("doc1"))
// Perform KNN search on UINT8 vector
searchOptionsUint8 := &redis.FTSearchOptions{
Return: []redis.FTSearchReturn{{FieldName: "uint8_vector"}},
SortBy: []redis.FTSearchSortBy{{FieldName: "uint8_vector", Asc: true}},
DialectVersion: 2,
Params: map[string]interface{}{"vec": "\x01\x02"},
}
resUint8, err := client.FTSearchWithArgs(ctx, "idx1", "*=>[KNN 1 @uint8_vector $vec]", searchOptionsUint8).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resUint8.Docs[0].ID).To(BeEquivalentTo("doc1"))
})
It("should fail when using a non-zero offset with a zero limit", Label("search", "ftsearch"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "testIdx", &redis.FTCreateOptions{}, &redis.FieldSchema{
FieldName: "txt",
FieldType: redis.SearchFieldTypeText,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "testIdx")
client.HSet(ctx, "doc1", "txt", "hello world")
// Attempt to search with a non-zero offset and zero limit.
_, err = client.FTSearchWithArgs(ctx, "testIdx", "hello", &redis.FTSearchOptions{
LimitOffset: 5,
Limit: 0,
}).Result()
Expect(err).To(HaveOccurred())
})
It("should evaluate exponentiation precedence in APPLY expressions correctly", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "txns", &redis.FTCreateOptions{}, &redis.FieldSchema{
FieldName: "dummy",
FieldType: redis.SearchFieldTypeText,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "txns")
client.HSet(ctx, "doc1", "dummy", "dummy")
correctOptions := &redis.FTAggregateOptions{
Apply: []redis.FTAggregateApply{
{Field: "(2*3^2)", As: "Value"},
},
Limit: 1,
LimitOffset: 0,
}
correctRes, err := client.FTAggregateWithArgs(ctx, "txns", "*", correctOptions).Result()
Expect(err).NotTo(HaveOccurred())
Expect(correctRes.Rows[0].Fields["Value"]).To(BeEquivalentTo("18"))
})
It("should return a syntax error when empty strings are used for numeric parameters", Label("search", "ftsearch"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "idx", &redis.FTCreateOptions{}, &redis.FieldSchema{
FieldName: "n",
FieldType: redis.SearchFieldTypeNumeric,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx")
client.HSet(ctx, "doc1", "n", 0)
_, err = client.FTSearchWithArgs(ctx, "idx", "*", &redis.FTSearchOptions{
Filters: []redis.FTSearchFilter{{
FieldName: "n",
Min: "",
Max: "",
}},
DialectVersion: 2,
}).Result()
Expect(err).To(HaveOccurred())
})
It("should return NaN as default for AVG reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestAvg", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestAvg")
client.HSet(ctx, "doc1", "grp", "g1")
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchAvg, Args: []interface{}{"@n"}, As: "avg"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestAvg", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["avg"]).To(SatisfyAny(Equal("nan"), Equal("NaN")))
})
It("should return 1 as default for COUNT reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestCount", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestCount")
client.HSet(ctx, "doc1", "grp", "g1")
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchCount, As: "cnt"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestCount", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["cnt"]).To(BeEquivalentTo("1"))
})
It("should return NaN as default for SUM reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestSum", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestSum")
client.HSet(ctx, "doc1", "grp", "g1")
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchSum, Args: []interface{}{"@n"}, As: "sum"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestSum", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["sum"]).To(SatisfyAny(Equal("nan"), Equal("NaN")))
})
It("should return the full requested number of results by re-running the query when some results expire", Label("search", "ftsearch"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggExpired", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "order", FieldType: redis.SearchFieldTypeNumeric, Sortable: true},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggExpired")
for i := 1; i <= 15; i++ {
key := fmt.Sprintf("doc%d", i)
_, err := client.HSet(ctx, key, "order", i).Result()
Expect(err).NotTo(HaveOccurred())
}
_, err = client.Del(ctx, "doc3", "doc7").Result()
Expect(err).NotTo(HaveOccurred())
options := &redis.FTSearchOptions{
SortBy: []redis.FTSearchSortBy{{FieldName: "order", Asc: true}},
LimitOffset: 0,
Limit: 10,
}
res, err := client.FTSearchWithArgs(ctx, "aggExpired", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(res.Docs)).To(BeEquivalentTo(10))
for _, doc := range res.Docs {
Expect(doc.ID).ToNot(Or(Equal("doc3"), Equal("doc7")))
}
})
It("should stop processing and return an error when a timeout occurs", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTimeoutHeavy", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric, Sortable: true},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTimeoutHeavy")
const totalDocs = 10000
for i := 0; i < totalDocs; i++ {
key := fmt.Sprintf("doc%d", i)
_, err := client.HSet(ctx, key, "n", i).Result()
Expect(err).NotTo(HaveOccurred())
}
options := &redis.FTAggregateOptions{
SortBy: []redis.FTAggregateSortBy{{FieldName: "@n", Desc: true}},
LimitOffset: 0,
Limit: 100,
Timeout: 1, // 1 ms timeout, expected to trigger a timeout error.
}
_, err = client.FTAggregateWithArgs(ctx, "aggTimeoutHeavy", "*", options).Result()
Expect(err).To(HaveOccurred())
Expect(strings.ToLower(err.Error())).To(ContainSubstring("timeout"))
})
It("should return 0 as default for COUNT_DISTINCT reducer when no values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestCountDistinct", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "x", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestCountDistinct")
client.HSet(ctx, "doc1", "grp", "g1")
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchCountDistinct, Args: []interface{}{"@x"}, As: "distinct_count"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestCountDistinct", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["distinct_count"]).To(BeEquivalentTo("0"))
})
It("should return 0 as default for COUNT_DISTINCTISH reducer when no values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestCountDistinctIsh", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "y", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestCountDistinctIsh")
_, err = client.HSet(ctx, "doc1", "grp", "g1").Result()
Expect(err).NotTo(HaveOccurred())
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchCountDistinctish, Args: []interface{}{"@y"}, As: "distinctish_count"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestCountDistinctIsh", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["distinctish_count"]).To(BeEquivalentTo("0"))
})
It("should use BM25 as the default scorer", Label("search", "ftsearch"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "scoringTest", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "scoringTest")
_, err = client.HSet(ctx, "doc1", "description", "red apple").Result()
Expect(err).NotTo(HaveOccurred())
_, err = client.HSet(ctx, "doc2", "description", "green apple").Result()
Expect(err).NotTo(HaveOccurred())
resDefault, err := client.FTSearchWithArgs(ctx, "scoringTest", "apple", &redis.FTSearchOptions{WithScores: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resDefault.Total).To(BeNumerically(">", 0))
resBM25, err := client.FTSearchWithArgs(ctx, "scoringTest", "apple", &redis.FTSearchOptions{WithScores: true, Scorer: "BM25"}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resBM25.Total).To(BeNumerically(">", 0))
Expect(resDefault.Total).To(BeEquivalentTo(resBM25.Total))
Expect(resDefault.Docs[0].ID).To(BeElementOf("doc1", "doc2"))
Expect(resDefault.Docs[1].ID).To(BeElementOf("doc1", "doc2"))
})
It("should return 0 as default for STDDEV reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestStddev", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestStddev")
_, err = client.HSet(ctx, "doc1", "grp", "g1").Result()
Expect(err).NotTo(HaveOccurred())
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchStdDev, Args: []interface{}{"@n"}, As: "stddev"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestStddev", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["stddev"]).To(BeEquivalentTo("0"))
})
It("should return NaN as default for QUANTILE reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestQuantile", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestQuantile")
_, err = client.HSet(ctx, "doc1", "grp", "g1").Result()
Expect(err).NotTo(HaveOccurred())
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchQuantile, Args: []interface{}{"@n", 0.5}, As: "quantile"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestQuantile", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["quantile"]).To(SatisfyAny(Equal("nan"), Equal("NaN")))
})
It("should return nil as default for FIRST_VALUE reducer when no values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestFirstValue", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "t", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestFirstValue")
_, err = client.HSet(ctx, "doc1", "grp", "g1").Result()
Expect(err).NotTo(HaveOccurred())
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchFirstValue, Args: []interface{}{"@t"}, As: "first_val"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestFirstValue", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["first_val"]).To(BeNil())
})
It("should fail to add an alias that is an existing index name", Label("search", "ftalias"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "name", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
val, err = client.FTCreate(ctx, "idx2", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "name", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx2")
_, err = client.FTAliasAdd(ctx, "idx2", "idx1").Result()
Expect(err).To(HaveOccurred())
Expect(strings.ToLower(err.Error())).To(ContainSubstring("alias"))
})
It("should test ft.search with CountOnly param", Label("search", "ftsearch"), func() {
val, err := client.FTCreate(ctx, "txtIndex", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "txt", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "txtIndex")
_, err = client.HSet(ctx, "doc1", "txt", "hello world").Result()
Expect(err).NotTo(HaveOccurred())
_, err = client.HSet(ctx, "doc2", "txt", "hello go").Result()
Expect(err).NotTo(HaveOccurred())
_, err = client.HSet(ctx, "doc3", "txt", "hello redis").Result()
Expect(err).NotTo(HaveOccurred())
optsCountOnly := &redis.FTSearchOptions{
CountOnly: true,
LimitOffset: 0,
Limit: 2, // even though we limit to 2, with count-only no docs are returned
DialectVersion: 2,
}
resCountOnly, err := client.FTSearchWithArgs(ctx, "txtIndex", "hello", optsCountOnly).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resCountOnly.Total).To(BeEquivalentTo(3))
Expect(len(resCountOnly.Docs)).To(BeEquivalentTo(0))
optsLimit := &redis.FTSearchOptions{
CountOnly: false,
LimitOffset: 0,
Limit: 2, // we expect to get 2 documents even though total count is 3
DialectVersion: 2,
}
resLimit, err := client.FTSearchWithArgs(ctx, "txtIndex", "hello", optsLimit).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resLimit.Total).To(BeEquivalentTo(3))
Expect(len(resLimit.Docs)).To(BeEquivalentTo(2))
})
It("should reject deprecated configuration keys", Label("search", "ftconfig"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
// List of deprecated configuration keys.
deprecatedKeys := []string{
"_FREE_RESOURCE_ON_THREAD",
"_NUMERIC_COMPRESS",
"_NUMERIC_RANGES_PARENTS",
"_PRINT_PROFILE_CLOCK",
"_PRIORITIZE_INTERSECT_UNION_CHILDREN",
"BG_INDEX_SLEEP_GAP",
"CONN_PER_SHARD",
"CURSOR_MAX_IDLE",
"CURSOR_REPLY_THRESHOLD",
"DEFAULT_DIALECT",
"EXTLOAD",
"FORK_GC_CLEAN_THRESHOLD",
"FORK_GC_RETRY_INTERVAL",
"FORK_GC_RUN_INTERVAL",
"FORKGC_SLEEP_BEFORE_EXIT",
"FRISOINI",
"GC_POLICY",
"GCSCANSIZE",
"INDEX_CURSOR_LIMIT",
"MAXAGGREGATERESULTS",
"MAXDOCTABLESIZE",
"MAXPREFIXEXPANSIONS",
"MAXSEARCHRESULTS",
"MIN_OPERATION_WORKERS",
"MIN_PHONETIC_TERM_LEN",
"MINPREFIX",
"MINSTEMLEN",
"NO_MEM_POOLS",
"NOGC",
"ON_TIMEOUT",
"MULTI_TEXT_SLOP",
"PARTIAL_INDEXED_DOCS",
"RAW_DOCID_ENCODING",
"SEARCH_THREADS",
"TIERED_HNSW_BUFFER_LIMIT",
"TIMEOUT",
"TOPOLOGY_VALIDATION_TIMEOUT",
"UNION_ITERATOR_HEAP",
"VSS_MAX_RESIZE",
"WORKERS",
"WORKERS_PRIORITY_BIAS_THRESHOLD",
"MT_MODE",
"WORKER_THREADS",
}
for _, key := range deprecatedKeys {
_, err := client.FTConfigSet(ctx, key, "test_value").Result()
Expect(err).To(HaveOccurred())
}
val, err := client.ConfigGet(ctx, "*").Result()
Expect(err).NotTo(HaveOccurred())
// Since FT.CONFIG is deprecated since redis 8, use CONFIG instead with new search parameters.
keys := make([]string, 0, len(val))
for key := range val {
keys = append(keys, key)
}
Expect(keys).To(ContainElement(ContainSubstring("search")))
})
It("should return INF for MIN reducer and -INF for MAX reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestMinMax", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestMinMax")
_, err = client.HSet(ctx, "doc1", "grp", "g1").Result()
Expect(err).NotTo(HaveOccurred())
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchMin, Args: []interface{}{"@n"}, As: "minValue"},
{Reducer: redis.SearchMax, Args: []interface{}{"@n"}, As: "maxValue"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestMinMax", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["minValue"]).To(BeEquivalentTo("inf"))
Expect(res.Rows[0].Fields["maxValue"]).To(BeEquivalentTo("-inf"))
})
})
func _assert_geosearch_result(result *redis.FTSearchResult, expectedDocIDs []string) {
@ -1588,96 +2209,6 @@ func _assert_geosearch_result(result *redis.FTSearchResult, expectedDocIDs []str
Expect(result.Total).To(BeEquivalentTo(len(expectedDocIDs)))
}
// It("should FTProfile Search and Aggregate", Label("search", "ftprofile"), func() {
// val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{FieldName: "t", FieldType: redis.SearchFieldTypeText}).Result()
// Expect(err).NotTo(HaveOccurred())
// Expect(val).To(BeEquivalentTo("OK"))
// WaitForIndexing(client, "idx1")
// client.HSet(ctx, "1", "t", "hello")
// client.HSet(ctx, "2", "t", "world")
// // FTProfile Search
// query := redis.FTSearchQuery("hello|world", &redis.FTSearchOptions{NoContent: true})
// res1, err := client.FTProfile(ctx, "idx1", false, query).Result()
// Expect(err).NotTo(HaveOccurred())
// panic(res1)
// Expect(len(res1["results"].([]interface{}))).To(BeEquivalentTo(3))
// resProfile := res1["profile"].(map[interface{}]interface{})
// Expect(resProfile["Parsing time"].(float64) < 0.5).To(BeTrue())
// iterProfile0 := resProfile["Iterators profile"].([]interface{})[0].(map[interface{}]interface{})
// Expect(iterProfile0["Counter"]).To(BeEquivalentTo(2.0))
// Expect(iterProfile0["Type"]).To(BeEquivalentTo("UNION"))
// // FTProfile Aggregate
// aggQuery := redis.FTAggregateQuery("*", &redis.FTAggregateOptions{
// Load: []redis.FTAggregateLoad{{Field: "t"}},
// Apply: []redis.FTAggregateApply{{Field: "startswith(@t, 'hel')", As: "prefix"}}})
// res2, err := client.FTProfile(ctx, "idx1", false, aggQuery).Result()
// Expect(err).NotTo(HaveOccurred())
// Expect(len(res2["results"].([]interface{}))).To(BeEquivalentTo(2))
// resProfile = res2["profile"].(map[interface{}]interface{})
// iterProfile0 = resProfile["Iterators profile"].([]interface{})[0].(map[interface{}]interface{})
// Expect(iterProfile0["Counter"]).To(BeEquivalentTo(2))
// Expect(iterProfile0["Type"]).To(BeEquivalentTo("WILDCARD"))
// })
// It("should FTProfile Search Limited", Label("search", "ftprofile"), func() {
// val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{FieldName: "t", FieldType: redis.SearchFieldTypeText}).Result()
// Expect(err).NotTo(HaveOccurred())
// Expect(val).To(BeEquivalentTo("OK"))
// WaitForIndexing(client, "idx1")
// client.HSet(ctx, "1", "t", "hello")
// client.HSet(ctx, "2", "t", "hell")
// client.HSet(ctx, "3", "t", "help")
// client.HSet(ctx, "4", "t", "helowa")
// // FTProfile Search
// query := redis.FTSearchQuery("%hell% hel*", &redis.FTSearchOptions{})
// res1, err := client.FTProfile(ctx, "idx1", true, query).Result()
// Expect(err).NotTo(HaveOccurred())
// resProfile := res1["profile"].(map[interface{}]interface{})
// iterProfile0 := resProfile["Iterators profile"].([]interface{})[0].(map[interface{}]interface{})
// Expect(iterProfile0["Type"]).To(BeEquivalentTo("INTERSECT"))
// Expect(len(res1["results"].([]interface{}))).To(BeEquivalentTo(3))
// Expect(iterProfile0["Child iterators"].([]interface{})[0].(map[interface{}]interface{})["Child iterators"]).To(BeEquivalentTo("The number of iterators in the union is 3"))
// Expect(iterProfile0["Child iterators"].([]interface{})[1].(map[interface{}]interface{})["Child iterators"]).To(BeEquivalentTo("The number of iterators in the union is 4"))
// })
// It("should FTProfile Search query params", Label("search", "ftprofile"), func() {
// hnswOptions := &redis.FTHNSWOptions{Type: "FLOAT32", Dim: 2, DistanceMetric: "L2"}
// val, err := client.FTCreate(ctx, "idx1",
// &redis.FTCreateOptions{},
// &redis.FieldSchema{FieldName: "v", FieldType: redis.SearchFieldTypeVector, VectorArgs: &redis.FTVectorArgs{HNSWOptions: hnswOptions}}).Result()
// Expect(err).NotTo(HaveOccurred())
// Expect(val).To(BeEquivalentTo("OK"))
// WaitForIndexing(client, "idx1")
// client.HSet(ctx, "a", "v", "aaaaaaaa")
// client.HSet(ctx, "b", "v", "aaaabaaa")
// client.HSet(ctx, "c", "v", "aaaaabaa")
// // FTProfile Search
// searchOptions := &redis.FTSearchOptions{
// Return: []redis.FTSearchReturn{{FieldName: "__v_score"}},
// SortBy: []redis.FTSearchSortBy{{FieldName: "__v_score", Asc: true}},
// DialectVersion: 2,
// Params: map[string]interface{}{"vec": "aaaaaaaa"},
// }
// query := redis.FTSearchQuery("*=>[KNN 2 @v $vec]", searchOptions)
// res1, err := client.FTProfile(ctx, "idx1", false, query).Result()
// Expect(err).NotTo(HaveOccurred())
// resProfile := res1["profile"].(map[interface{}]interface{})
// iterProfile0 := resProfile["Iterators profile"].([]interface{})[0].(map[interface{}]interface{})
// Expect(iterProfile0["Counter"]).To(BeEquivalentTo(2))
// Expect(iterProfile0["Type"]).To(BeEquivalentTo(redis.SearchFieldTypeVector.String()))
// Expect(res1["total_results"]).To(BeEquivalentTo(2))
// results0 := res1["results"].([]interface{})[0].(map[interface{}]interface{})
// Expect(results0["id"]).To(BeEquivalentTo("a"))
// Expect(results0["extra_attributes"].(map[interface{}]interface{})["__v_score"]).To(BeEquivalentTo("0"))
// })
var _ = Describe("RediSearch FT.Config with Resp2 and Resp3", Label("search", "NonRedisEnterprise"), func() {
var clientResp2 *redis.Client

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"strings"
"sync"
@ -80,9 +81,20 @@ type FailoverOptions struct {
TLSConfig *tls.Config
// DisableIndentity - Disable set-lib on connect.
//
// default: false
//
// Deprecated: Use DisableIdentity instead.
DisableIndentity bool
IdentitySuffix string
UnstableResp3 bool
// DisableIdentity is used to disable CLIENT SETINFO command on connect.
//
// default: false
DisableIdentity bool
IdentitySuffix string
UnstableResp3 bool
}
func (opt *FailoverOptions) clientOptions() *Options {
@ -118,9 +130,11 @@ func (opt *FailoverOptions) clientOptions() *Options {
TLSConfig: opt.TLSConfig,
DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIndentity,
IdentitySuffix: opt.IdentitySuffix,
UnstableResp3: opt.UnstableResp3,
IdentitySuffix: opt.IdentitySuffix,
UnstableResp3: opt.UnstableResp3,
}
}
@ -156,9 +170,11 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
TLSConfig: opt.TLSConfig,
DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIndentity,
IdentitySuffix: opt.IdentitySuffix,
UnstableResp3: opt.UnstableResp3,
IdentitySuffix: opt.IdentitySuffix,
UnstableResp3: opt.UnstableResp3,
}
}
@ -197,8 +213,10 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
TLSConfig: opt.TLSConfig,
DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIndentity,
IdentitySuffix: opt.IdentitySuffix,
IdentitySuffix: opt.IdentitySuffix,
}
}
@ -549,29 +567,50 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
}
}
var (
masterAddr string
wg sync.WaitGroup
once sync.Once
errCh = make(chan error, len(c.sentinelAddrs))
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
_ = sentinel.Close()
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return "", err
wg.Add(1)
go func(i int, addr string) {
defer wg.Done()
sentinelCli := NewSentinelClient(c.opt.sentinelOptions(addr))
addrVal, err := sentinelCli.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName addr=%s, master=%q failed: %s",
addr, c.opt.MasterName, err)
_ = sentinelCli.Close()
errCh <- err
return
}
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
c.opt.MasterName, err)
continue
}
// Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil
once.Do(func() {
masterAddr = net.JoinHostPort(addrVal[0], addrVal[1])
// Push working sentinel to the top
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinelCli)
internal.Logger.Printf(ctx, "sentinel: selected addr=%s masterAddr=%s", addr, masterAddr)
cancel()
})
}(i, sentinelAddr)
}
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
wg.Wait()
close(errCh)
if masterAddr != "" {
return masterAddr, nil
}
errs := make([]error, 0, len(errCh))
for err := range errCh {
errs = append(errs, err)
}
return "", fmt.Errorf("redis: all sentinels specified in configuration are unreachable: %w", errors.Join(errs...))
}
func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
@ -798,6 +837,22 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
}
opt := failoverOpt.clusterOptions()
if failoverOpt.DB != 0 {
onConnect := opt.OnConnect
opt.OnConnect = func(ctx context.Context, cn *Conn) error {
if err := cn.Select(ctx, failoverOpt.DB).Err(); err != nil {
return err
}
if onConnect != nil {
return onConnect(ctx, cn)
}
return nil
}
}
opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
masterAddr, err := failover.MasterAddr(ctx)
if err != nil {

View File

@ -3,6 +3,7 @@ package redis_test
import (
"context"
"net"
"time"
. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
@ -32,6 +33,24 @@ var _ = Describe("Sentinel PROTO 2", func() {
})
})
var _ = Describe("Sentinel resolution", func() {
It("should resolve master without context exhaustion", func() {
shortCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()
client := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: sentinelName,
SentinelAddrs: sentinelAddrs,
MaxRetries: -1,
})
err := client.Ping(shortCtx).Err()
Expect(err).NotTo(HaveOccurred(), "expected master to resolve without context exhaustion")
_ = client.Close()
})
})
var _ = Describe("Sentinel", func() {
var client *redis.Client
var master *redis.Client
@ -200,6 +219,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
SentinelAddrs: sentinelAddrs,
RouteRandomly: true,
DB: 1,
})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
@ -289,6 +309,20 @@ var _ = Describe("NewFailoverClusterClient", func() {
})
})
It("should sentinel cluster client db", func() {
err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
return c.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
clientInfo, err := c.ClientInfo(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(clientInfo.DB).To(Equal(1))
return nil
})
})
It("should sentinel cluster PROTO 3", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := client.Do(ctx, "HELLO").Result()

View File

@ -61,14 +61,24 @@ type UniversalOptions struct {
RouteByLatency bool
RouteRandomly bool
// The sentinel master name.
// Only failover clients.
// MasterName is the sentinel master name.
// Only for failover clients.
MasterName string
// DisableIndentity - Disable set-lib on connect.
//
// default: false
//
// Deprecated: Use DisableIdentity instead.
DisableIndentity bool
IdentitySuffix string
UnstableResp3 bool
// DisableIdentity is used to disable CLIENT SETINFO command on connect.
//
// default: false
DisableIdentity bool
IdentitySuffix string
UnstableResp3 bool
// IsClusterMode can be used when only one Addrs is provided (e.g. Elasticache supports setting up cluster mode with configuration endpoint).
IsClusterMode bool
@ -116,6 +126,7 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
TLSConfig: o.TLSConfig,
DisableIdentity: o.DisableIdentity,
DisableIndentity: o.DisableIndentity,
IdentitySuffix: o.IdentitySuffix,
UnstableResp3: o.UnstableResp3,
@ -143,6 +154,9 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
SentinelUsername: o.SentinelUsername,
SentinelPassword: o.SentinelPassword,
RouteByLatency: o.RouteByLatency,
RouteRandomly: o.RouteRandomly,
MaxRetries: o.MaxRetries,
MinRetryBackoff: o.MinRetryBackoff,
MaxRetryBackoff: o.MaxRetryBackoff,
@ -163,8 +177,9 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
TLSConfig: o.TLSConfig,
ReplicaOnly: o.ReadOnly,
ReplicaOnly: o.ReadOnly,
DisableIdentity: o.DisableIdentity,
DisableIndentity: o.DisableIndentity,
IdentitySuffix: o.IdentitySuffix,
UnstableResp3: o.UnstableResp3,
@ -209,6 +224,7 @@ func (o *UniversalOptions) Simple() *Options {
TLSConfig: o.TLSConfig,
DisableIdentity: o.DisableIdentity,
DisableIndentity: o.DisableIndentity,
IdentitySuffix: o.IdentitySuffix,
UnstableResp3: o.UnstableResp3,
@ -243,14 +259,22 @@ var (
// NewUniversalClient returns a new multi client. The type of the returned client depends
// on the following conditions:
//
// 1. If the MasterName option is specified, a sentinel-backed FailoverClient is returned.
// 2. if the number of Addrs is two or more, a ClusterClient is returned.
// 3. Otherwise, a single-node Client is returned.
// 1. If the MasterName option is specified with RouteByLatency, RouteRandomly or IsClusterMode,
// a FailoverClusterClient is returned.
// 2. If the MasterName option is specified without RouteByLatency, RouteRandomly or IsClusterMode,
// a sentinel-backed FailoverClient is returned.
// 3. If the number of Addrs is two or more, or IsClusterMode option is specified,
// a ClusterClient is returned.
// 4. Otherwise, a single-node Client is returned.
func NewUniversalClient(opts *UniversalOptions) UniversalClient {
if opts.MasterName != "" {
switch {
case opts.MasterName != "" && (opts.RouteByLatency || opts.RouteRandomly || opts.IsClusterMode):
return NewFailoverClusterClient(opts.Failover())
case opts.MasterName != "":
return NewFailoverClient(opts.Failover())
} else if len(opts.Addrs) > 1 || opts.IsClusterMode {
case len(opts.Addrs) > 1 || opts.IsClusterMode:
return NewClusterClient(opts.Cluster())
default:
return NewClient(opts.Simple())
}
return NewClient(opts.Simple())
}

View File

@ -24,6 +24,16 @@ var _ = Describe("UniversalClient", func() {
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
})
It("should connect to failover cluster", Label("NonRedisEnterprise"), func() {
client = redis.NewUniversalClient(&redis.UniversalOptions{
MasterName: sentinelName,
RouteRandomly: true,
Addrs: sentinelAddrs,
})
_, ok := client.(*redis.ClusterClient)
Expect(ok).To(BeTrue(), "expected a ClusterClient")
})
It("should connect to simple servers", func() {
client = redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{redisAddr},
@ -79,6 +89,7 @@ var _ = Describe("UniversalClient", func() {
err = client.Set(ctx, "somekey", "somevalue", 0).Err()
Expect(err).To(HaveOccurred())
})
It("should connect to clusters if IsClusterMode is set even if only a single address is provided", Label("NonRedisEnterprise"), func() {
client = redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{cluster.addrs()[0]},
@ -96,4 +107,3 @@ var _ = Describe("UniversalClient", func() {
Expect(client.ClusterSlots(ctx).Val()).To(HaveLen(3))
})
})

View File

@ -2,5 +2,5 @@ package redis
// Version is the current release version.
func Version() string {
return "9.7.1"
return "9.8.0-beta.1"
}