1
0
mirror of https://github.com/redis/go-redis.git synced 2025-06-05 06:42:39 +03:00

Merge branch 'master' into ndyakov/update-readme-discord

This commit is contained in:
Nedyalko Dyakov 2025-04-18 17:14:34 +03:00 committed by GitHub
commit dd8ba4d255
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 629 additions and 24 deletions

View File

@ -152,6 +152,32 @@ func ExampleClient_search_json() {
// >>> Tel Aviv
// STEP_END
// STEP_START query2count_only
citiesResult2, err := rdb.FTSearchWithArgs(
ctx,
"idx:users",
"Paul",
&redis.FTSearchOptions{
Return: []redis.FTSearchReturn{
{
FieldName: "$.city",
As: "city",
},
},
CountOnly: true,
},
).Result()
if err != nil {
panic(err)
}
// The `Total` field has the correct number of docs found
// by the query but the `Docs` slice is empty.
fmt.Println(len(citiesResult2.Docs)) // >>> 0
fmt.Println(citiesResult2.Total) // >>> 2
// STEP_END
// STEP_START query3
aggOptions := redis.FTAggregateOptions{
GroupBy: []redis.FTAggregateGroupBy{
@ -196,6 +222,8 @@ func ExampleClient_search_json() {
// {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv","email":"paul.zamir@example.com","name":"Paul Zamir"}]}]}
// London
// Tel Aviv
// 0
// 2
// London - 1
// Tel Aviv - 2
}

View File

@ -352,3 +352,27 @@ var _ = Describe("withConn", func() {
Expect(client.connPool.Len()).To(Equal(1))
})
})
var _ = Describe("ClusterClient", func() {
var client *ClusterClient
BeforeEach(func() {
client = &ClusterClient{}
})
Describe("cmdSlot", func() {
It("select slot from args for GETKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200)
slot := client.cmdSlot(context.Background(), cmd)
Expect(slot).To(Equal(100))
})
It("select slot from args for COUNTKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100)
slot := client.cmdSlot(context.Background(), cmd)
Expect(slot).To(Equal(100))
})
})
})

View File

@ -1856,7 +1856,7 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
func (c *ClusterClient) cmdSlot(ctx context.Context, cmd Cmder) int {
args := cmd.Args()
if args[0] == "cluster" && args[1] == "getkeysinslot" {
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
return args[2].(int)
}

View File

@ -114,6 +114,7 @@ type SpellCheckTerms struct {
}
type FTExplainOptions struct {
// Dialect 1,3 and 4 are deprecated since redis 8.0
Dialect string
}
@ -261,7 +262,8 @@ type FTAggregateOptions struct {
WithCursor bool
WithCursorOptions *FTAggregateWithCursor
Params map[string]interface{}
DialectVersion int
// Dialect 1,3 and 4 are deprecated since redis 8.0
DialectVersion int
}
type FTSearchFilter struct {
@ -322,8 +324,9 @@ type FTSearchOptions struct {
Limit int
// CountOnly sets LIMIT 0 0 to get the count - number of documents in the result set without actually returning the result set.
// When using this option, the Limit and LimitOffset options are ignored.
CountOnly bool
Params map[string]interface{}
CountOnly bool
Params map[string]interface{}
// Dialect 1,3 and 4 are deprecated since redis 8.0
DialectVersion int
}
@ -440,7 +443,8 @@ type IndexDefinition struct {
type FTSpellCheckOptions struct {
Distance int
Terms *FTSpellCheckTerms
Dialect int
// Dialect 1,3 and 4 are deprecated since redis 8.0
Dialect int
}
type FTSpellCheckTerms struct {

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"
. "github.com/bsm/ginkgo/v2"
@ -1683,6 +1684,389 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
Expect(resUint8.Docs[0].ID).To(BeEquivalentTo("doc1"))
})
It("should fail when using a non-zero offset with a zero limit", Label("search", "ftsearch"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "testIdx", &redis.FTCreateOptions{}, &redis.FieldSchema{
FieldName: "txt",
FieldType: redis.SearchFieldTypeText,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "testIdx")
client.HSet(ctx, "doc1", "txt", "hello world")
// Attempt to search with a non-zero offset and zero limit.
_, err = client.FTSearchWithArgs(ctx, "testIdx", "hello", &redis.FTSearchOptions{
LimitOffset: 5,
Limit: 0,
}).Result()
Expect(err).To(HaveOccurred())
})
It("should evaluate exponentiation precedence in APPLY expressions correctly", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "txns", &redis.FTCreateOptions{}, &redis.FieldSchema{
FieldName: "dummy",
FieldType: redis.SearchFieldTypeText,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "txns")
client.HSet(ctx, "doc1", "dummy", "dummy")
correctOptions := &redis.FTAggregateOptions{
Apply: []redis.FTAggregateApply{
{Field: "(2*3^2)", As: "Value"},
},
Limit: 1,
LimitOffset: 0,
}
correctRes, err := client.FTAggregateWithArgs(ctx, "txns", "*", correctOptions).Result()
Expect(err).NotTo(HaveOccurred())
Expect(correctRes.Rows[0].Fields["Value"]).To(BeEquivalentTo("18"))
})
It("should return a syntax error when empty strings are used for numeric parameters", Label("search", "ftsearch"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "idx", &redis.FTCreateOptions{}, &redis.FieldSchema{
FieldName: "n",
FieldType: redis.SearchFieldTypeNumeric,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx")
client.HSet(ctx, "doc1", "n", 0)
_, err = client.FTSearchWithArgs(ctx, "idx", "*", &redis.FTSearchOptions{
Filters: []redis.FTSearchFilter{{
FieldName: "n",
Min: "",
Max: "",
}},
DialectVersion: 2,
}).Result()
Expect(err).To(HaveOccurred())
})
It("should return NaN as default for AVG reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestAvg", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestAvg")
client.HSet(ctx, "doc1", "grp", "g1")
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchAvg, Args: []interface{}{"@n"}, As: "avg"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestAvg", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["avg"]).To(SatisfyAny(Equal("nan"), Equal("NaN")))
})
It("should return 1 as default for COUNT reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestCount", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestCount")
client.HSet(ctx, "doc1", "grp", "g1")
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchCount, As: "cnt"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestCount", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["cnt"]).To(BeEquivalentTo("1"))
})
It("should return NaN as default for SUM reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestSum", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestSum")
client.HSet(ctx, "doc1", "grp", "g1")
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchSum, Args: []interface{}{"@n"}, As: "sum"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestSum", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["sum"]).To(SatisfyAny(Equal("nan"), Equal("NaN")))
})
It("should return the full requested number of results by re-running the query when some results expire", Label("search", "ftsearch"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggExpired", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "order", FieldType: redis.SearchFieldTypeNumeric, Sortable: true},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggExpired")
for i := 1; i <= 15; i++ {
key := fmt.Sprintf("doc%d", i)
_, err := client.HSet(ctx, key, "order", i).Result()
Expect(err).NotTo(HaveOccurred())
}
_, err = client.Del(ctx, "doc3", "doc7").Result()
Expect(err).NotTo(HaveOccurred())
options := &redis.FTSearchOptions{
SortBy: []redis.FTSearchSortBy{{FieldName: "order", Asc: true}},
LimitOffset: 0,
Limit: 10,
}
res, err := client.FTSearchWithArgs(ctx, "aggExpired", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(res.Docs)).To(BeEquivalentTo(10))
for _, doc := range res.Docs {
Expect(doc.ID).ToNot(Or(Equal("doc3"), Equal("doc7")))
}
})
It("should stop processing and return an error when a timeout occurs", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTimeoutHeavy", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric, Sortable: true},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTimeoutHeavy")
const totalDocs = 10000
for i := 0; i < totalDocs; i++ {
key := fmt.Sprintf("doc%d", i)
_, err := client.HSet(ctx, key, "n", i).Result()
Expect(err).NotTo(HaveOccurred())
}
options := &redis.FTAggregateOptions{
SortBy: []redis.FTAggregateSortBy{{FieldName: "@n", Desc: true}},
LimitOffset: 0,
Limit: 100,
Timeout: 1, // 1 ms timeout, expected to trigger a timeout error.
}
_, err = client.FTAggregateWithArgs(ctx, "aggTimeoutHeavy", "*", options).Result()
Expect(err).To(HaveOccurred())
Expect(strings.ToLower(err.Error())).To(ContainSubstring("timeout"))
})
It("should return 0 as default for COUNT_DISTINCT reducer when no values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestCountDistinct", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "x", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestCountDistinct")
client.HSet(ctx, "doc1", "grp", "g1")
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchCountDistinct, Args: []interface{}{"@x"}, As: "distinct_count"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestCountDistinct", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["distinct_count"]).To(BeEquivalentTo("0"))
})
It("should return 0 as default for COUNT_DISTINCTISH reducer when no values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestCountDistinctIsh", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "y", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestCountDistinctIsh")
_, err = client.HSet(ctx, "doc1", "grp", "g1").Result()
Expect(err).NotTo(HaveOccurred())
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchCountDistinctish, Args: []interface{}{"@y"}, As: "distinctish_count"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestCountDistinctIsh", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["distinctish_count"]).To(BeEquivalentTo("0"))
})
It("should use BM25 as the default scorer", Label("search", "ftsearch"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "scoringTest", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "scoringTest")
_, err = client.HSet(ctx, "doc1", "description", "red apple").Result()
Expect(err).NotTo(HaveOccurred())
_, err = client.HSet(ctx, "doc2", "description", "green apple").Result()
Expect(err).NotTo(HaveOccurred())
resDefault, err := client.FTSearchWithArgs(ctx, "scoringTest", "apple", &redis.FTSearchOptions{WithScores: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resDefault.Total).To(BeNumerically(">", 0))
resBM25, err := client.FTSearchWithArgs(ctx, "scoringTest", "apple", &redis.FTSearchOptions{WithScores: true, Scorer: "BM25"}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resBM25.Total).To(BeNumerically(">", 0))
Expect(resDefault.Total).To(BeEquivalentTo(resBM25.Total))
Expect(resDefault.Docs[0].ID).To(BeElementOf("doc1", "doc2"))
Expect(resDefault.Docs[1].ID).To(BeElementOf("doc1", "doc2"))
})
It("should return 0 as default for STDDEV reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestStddev", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestStddev")
_, err = client.HSet(ctx, "doc1", "grp", "g1").Result()
Expect(err).NotTo(HaveOccurred())
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchStdDev, Args: []interface{}{"@n"}, As: "stddev"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestStddev", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["stddev"]).To(BeEquivalentTo("0"))
})
It("should return NaN as default for QUANTILE reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestQuantile", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestQuantile")
_, err = client.HSet(ctx, "doc1", "grp", "g1").Result()
Expect(err).NotTo(HaveOccurred())
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchQuantile, Args: []interface{}{"@n", 0.5}, As: "quantile"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestQuantile", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["quantile"]).To(SatisfyAny(Equal("nan"), Equal("NaN")))
})
It("should return nil as default for FIRST_VALUE reducer when no values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestFirstValue", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "t", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestFirstValue")
_, err = client.HSet(ctx, "doc1", "grp", "g1").Result()
Expect(err).NotTo(HaveOccurred())
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchFirstValue, Args: []interface{}{"@t"}, As: "first_val"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestFirstValue", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["first_val"]).To(BeNil())
})
It("should fail to add an alias that is an existing index name", Label("search", "ftalias"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "name", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
val, err = client.FTCreate(ctx, "idx2", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "name", FieldType: redis.SearchFieldTypeText},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx2")
_, err = client.FTAliasAdd(ctx, "idx2", "idx1").Result()
Expect(err).To(HaveOccurred())
Expect(strings.ToLower(err.Error())).To(ContainSubstring("alias"))
})
It("should test ft.search with CountOnly param", Label("search", "ftsearch"), func() {
val, err := client.FTCreate(ctx, "txtIndex", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "txt", FieldType: redis.SearchFieldTypeText},
@ -1721,6 +2105,99 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
Expect(len(resLimit.Docs)).To(BeEquivalentTo(2))
})
It("should reject deprecated configuration keys", Label("search", "ftconfig"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
// List of deprecated configuration keys.
deprecatedKeys := []string{
"_FREE_RESOURCE_ON_THREAD",
"_NUMERIC_COMPRESS",
"_NUMERIC_RANGES_PARENTS",
"_PRINT_PROFILE_CLOCK",
"_PRIORITIZE_INTERSECT_UNION_CHILDREN",
"BG_INDEX_SLEEP_GAP",
"CONN_PER_SHARD",
"CURSOR_MAX_IDLE",
"CURSOR_REPLY_THRESHOLD",
"DEFAULT_DIALECT",
"EXTLOAD",
"FORK_GC_CLEAN_THRESHOLD",
"FORK_GC_RETRY_INTERVAL",
"FORK_GC_RUN_INTERVAL",
"FORKGC_SLEEP_BEFORE_EXIT",
"FRISOINI",
"GC_POLICY",
"GCSCANSIZE",
"INDEX_CURSOR_LIMIT",
"MAXAGGREGATERESULTS",
"MAXDOCTABLESIZE",
"MAXPREFIXEXPANSIONS",
"MAXSEARCHRESULTS",
"MIN_OPERATION_WORKERS",
"MIN_PHONETIC_TERM_LEN",
"MINPREFIX",
"MINSTEMLEN",
"NO_MEM_POOLS",
"NOGC",
"ON_TIMEOUT",
"MULTI_TEXT_SLOP",
"PARTIAL_INDEXED_DOCS",
"RAW_DOCID_ENCODING",
"SEARCH_THREADS",
"TIERED_HNSW_BUFFER_LIMIT",
"TIMEOUT",
"TOPOLOGY_VALIDATION_TIMEOUT",
"UNION_ITERATOR_HEAP",
"VSS_MAX_RESIZE",
"WORKERS",
"WORKERS_PRIORITY_BIAS_THRESHOLD",
"MT_MODE",
"WORKER_THREADS",
}
for _, key := range deprecatedKeys {
_, err := client.FTConfigSet(ctx, key, "test_value").Result()
Expect(err).To(HaveOccurred())
}
val, err := client.ConfigGet(ctx, "*").Result()
Expect(err).NotTo(HaveOccurred())
// Since FT.CONFIG is deprecated since redis 8, use CONFIG instead with new search parameters.
keys := make([]string, 0, len(val))
for key := range val {
keys = append(keys, key)
}
Expect(keys).To(ContainElement(ContainSubstring("search")))
})
It("should return INF for MIN reducer and -INF for MAX reducer when no numeric values are present", Label("search", "ftaggregate"), func() {
SkipBeforeRedisVersion(7.9, "requires Redis 8.x")
val, err := client.FTCreate(ctx, "aggTestMinMax", &redis.FTCreateOptions{},
&redis.FieldSchema{FieldName: "grp", FieldType: redis.SearchFieldTypeText},
&redis.FieldSchema{FieldName: "n", FieldType: redis.SearchFieldTypeNumeric},
).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "aggTestMinMax")
_, err = client.HSet(ctx, "doc1", "grp", "g1").Result()
Expect(err).NotTo(HaveOccurred())
reducers := []redis.FTAggregateReducer{
{Reducer: redis.SearchMin, Args: []interface{}{"@n"}, As: "minValue"},
{Reducer: redis.SearchMax, Args: []interface{}{"@n"}, As: "maxValue"},
}
groupBy := []redis.FTAggregateGroupBy{
{Fields: []interface{}{"@grp"}, Reduce: reducers},
}
options := &redis.FTAggregateOptions{GroupBy: groupBy}
res, err := client.FTAggregateWithArgs(ctx, "aggTestMinMax", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Rows).ToNot(BeEmpty())
Expect(res.Rows[0].Fields["minValue"]).To(BeEquivalentTo("inf"))
Expect(res.Rows[0].Fields["maxValue"]).To(BeEquivalentTo("-inf"))
})
})
func _assert_geosearch_result(result *redis.FTSearchResult, expectedDocIDs []string) {

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"strings"
"sync"
@ -566,29 +567,50 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
}
}
var (
masterAddr string
wg sync.WaitGroup
once sync.Once
errCh = make(chan error, len(c.sentinelAddrs))
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
_ = sentinel.Close()
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return "", err
wg.Add(1)
go func(i int, addr string) {
defer wg.Done()
sentinelCli := NewSentinelClient(c.opt.sentinelOptions(addr))
addrVal, err := sentinelCli.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName addr=%s, master=%q failed: %s",
addr, c.opt.MasterName, err)
_ = sentinelCli.Close()
errCh <- err
return
}
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
c.opt.MasterName, err)
continue
}
// Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil
once.Do(func() {
masterAddr = net.JoinHostPort(addrVal[0], addrVal[1])
// Push working sentinel to the top
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinelCli)
internal.Logger.Printf(ctx, "sentinel: selected addr=%s masterAddr=%s", addr, masterAddr)
cancel()
})
}(i, sentinelAddr)
}
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
wg.Wait()
close(errCh)
if masterAddr != "" {
return masterAddr, nil
}
errs := make([]error, 0, len(errCh))
for err := range errCh {
errs = append(errs, err)
}
return "", fmt.Errorf("redis: all sentinels specified in configuration are unreachable: %w", errors.Join(errs...))
}
func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
@ -815,6 +837,22 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
}
opt := failoverOpt.clusterOptions()
if failoverOpt.DB != 0 {
onConnect := opt.OnConnect
opt.OnConnect = func(ctx context.Context, cn *Conn) error {
if err := cn.Select(ctx, failoverOpt.DB).Err(); err != nil {
return err
}
if onConnect != nil {
return onConnect(ctx, cn)
}
return nil
}
}
opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
masterAddr, err := failover.MasterAddr(ctx)
if err != nil {

View File

@ -3,6 +3,7 @@ package redis_test
import (
"context"
"net"
"time"
. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
@ -32,6 +33,24 @@ var _ = Describe("Sentinel PROTO 2", func() {
})
})
var _ = Describe("Sentinel resolution", func() {
It("should resolve master without context exhaustion", func() {
shortCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()
client := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: sentinelName,
SentinelAddrs: sentinelAddrs,
MaxRetries: -1,
})
err := client.Ping(shortCtx).Err()
Expect(err).NotTo(HaveOccurred(), "expected master to resolve without context exhaustion")
_ = client.Close()
})
})
var _ = Describe("Sentinel", func() {
var client *redis.Client
var master *redis.Client
@ -200,6 +219,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
SentinelAddrs: sentinelAddrs,
RouteRandomly: true,
DB: 1,
})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
@ -289,6 +309,20 @@ var _ = Describe("NewFailoverClusterClient", func() {
})
})
It("should sentinel cluster client db", func() {
err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
return c.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
clientInfo, err := c.ClientInfo(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(clientInfo.DB).To(Equal(1))
return nil
})
})
It("should sentinel cluster PROTO 3", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := client.Do(ctx, "HELLO").Result()