diff --git a/.github/workflows/doctests.yaml b/.github/workflows/doctests.yaml index 1afd0d80..bd95c58d 100644 --- a/.github/workflows/doctests.yaml +++ b/.github/workflows/doctests.yaml @@ -16,7 +16,7 @@ jobs: services: redis-stack: - image: redislabs/client-libs-test:8.0.2 + image: redislabs/client-libs-test:8.4-RC1-pre.2 env: TLS_ENABLED: no REDIS_CLUSTER: no diff --git a/command.go b/command.go index acf4e3df..04dd8ddb 100644 --- a/command.go +++ b/command.go @@ -709,6 +709,68 @@ func (cmd *IntCmd) readReply(rd *proto.Reader) (err error) { //------------------------------------------------------------------------------ +// DigestCmd is a command that returns a uint64 xxh3 hash digest. +// +// This command is specifically designed for the Redis DIGEST command, +// which returns the xxh3 hash of a key's value as a hex string. +// The hex string is automatically parsed to a uint64 value. +// +// The digest can be used for optimistic locking with SetIFDEQ, SetIFDNE, +// and DelExArgs commands. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/digest/ +type DigestCmd struct { + baseCmd + + val uint64 +} + +var _ Cmder = (*DigestCmd)(nil) + +func NewDigestCmd(ctx context.Context, args ...interface{}) *DigestCmd { + return &DigestCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *DigestCmd) SetVal(val uint64) { + cmd.val = val +} + +func (cmd *DigestCmd) Val() uint64 { + return cmd.val +} + +func (cmd *DigestCmd) Result() (uint64, error) { + return cmd.val, cmd.err +} + +func (cmd *DigestCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *DigestCmd) readReply(rd *proto.Reader) (err error) { + // Redis DIGEST command returns a hex string (e.g., "a1b2c3d4e5f67890") + // We parse it as a uint64 xxh3 hash value + var hexStr string + hexStr, err = rd.ReadString() + if err != nil { + return err + } + + // Parse hex string to uint64 + cmd.val, err = strconv.ParseUint(hexStr, 16, 64) + return err +} + +//------------------------------------------------------------------------------ + type IntSliceCmd struct { baseCmd @@ -3816,6 +3878,83 @@ func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error { //----------------------------------------------------------------------- +type Latency struct { + Name string + Time time.Time + Latest time.Duration + Max time.Duration +} + +type LatencyCmd struct { + baseCmd + val []Latency +} + +var _ Cmder = (*LatencyCmd)(nil) + +func NewLatencyCmd(ctx context.Context, args ...interface{}) *LatencyCmd { + return &LatencyCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *LatencyCmd) SetVal(val []Latency) { + cmd.val = val +} + +func (cmd *LatencyCmd) Val() []Latency { + return cmd.val +} + +func (cmd *LatencyCmd) Result() ([]Latency, error) { + return cmd.val, cmd.err +} + +func (cmd *LatencyCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *LatencyCmd) readReply(rd *proto.Reader) error { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + cmd.val = make([]Latency, n) + for i := 0; i < len(cmd.val); i++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return err + } + if nn < 3 { + return fmt.Errorf("redis: got %d elements in latency get, expected at least 3", nn) + } + if cmd.val[i].Name, err = rd.ReadString(); err != nil { + return err + } + createdAt, err := rd.ReadInt() + if err != nil { + return err + } + cmd.val[i].Time = time.Unix(createdAt, 0) + latest, err := rd.ReadInt() + if err != nil { + return err + } + cmd.val[i].Latest = time.Duration(latest) * time.Millisecond + maximum, err := rd.ReadInt() + if err != nil { + return err + } + cmd.val[i].Max = time.Duration(maximum) * time.Millisecond + } + return nil +} + +//----------------------------------------------------------------------- + type MapStringInterfaceCmd struct { baseCmd diff --git a/command_digest_test.go b/command_digest_test.go new file mode 100644 index 00000000..6b65b3eb --- /dev/null +++ b/command_digest_test.go @@ -0,0 +1,118 @@ +package redis + +import ( + "context" + "fmt" + "testing" + + "github.com/redis/go-redis/v9/internal/proto" +) + +func TestDigestCmd(t *testing.T) { + tests := []struct { + name string + hexStr string + expected uint64 + wantErr bool + }{ + { + name: "zero value", + hexStr: "0", + expected: 0, + wantErr: false, + }, + { + name: "small value", + hexStr: "ff", + expected: 255, + wantErr: false, + }, + { + name: "medium value", + hexStr: "1234abcd", + expected: 0x1234abcd, + wantErr: false, + }, + { + name: "large value", + hexStr: "ffffffffffffffff", + expected: 0xffffffffffffffff, + wantErr: false, + }, + { + name: "uppercase hex", + hexStr: "DEADBEEF", + expected: 0xdeadbeef, + wantErr: false, + }, + { + name: "mixed case hex", + hexStr: "DeAdBeEf", + expected: 0xdeadbeef, + wantErr: false, + }, + { + name: "typical xxh3 hash", + hexStr: "a1b2c3d4e5f67890", + expected: 0xa1b2c3d4e5f67890, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock reader that returns the hex string in RESP format + // Format: $\r\n\r\n + respData := []byte(fmt.Sprintf("$%d\r\n%s\r\n", len(tt.hexStr), tt.hexStr)) + + rd := proto.NewReader(newMockConn(respData)) + + cmd := NewDigestCmd(context.Background(), "digest", "key") + err := cmd.readReply(rd) + + if (err != nil) != tt.wantErr { + t.Errorf("DigestCmd.readReply() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !tt.wantErr && cmd.Val() != tt.expected { + t.Errorf("DigestCmd.Val() = %d (0x%x), want %d (0x%x)", cmd.Val(), cmd.Val(), tt.expected, tt.expected) + } + }) + } +} + +func TestDigestCmdResult(t *testing.T) { + cmd := NewDigestCmd(context.Background(), "digest", "key") + expected := uint64(0xdeadbeefcafebabe) + cmd.SetVal(expected) + + val, err := cmd.Result() + if err != nil { + t.Errorf("DigestCmd.Result() error = %v", err) + } + + if val != expected { + t.Errorf("DigestCmd.Result() = %d (0x%x), want %d (0x%x)", val, val, expected, expected) + } +} + +// mockConn is a simple mock connection for testing +type mockConn struct { + data []byte + pos int +} + +func newMockConn(data []byte) *mockConn { + return &mockConn{data: data} +} + +func (c *mockConn) Read(p []byte) (n int, err error) { + if c.pos >= len(c.data) { + return 0, nil + } + n = copy(p, c.data[c.pos:]) + c.pos += n + return n, nil +} + diff --git a/commands.go b/commands.go index 6f5b67c6..d133808f 100644 --- a/commands.go +++ b/commands.go @@ -212,9 +212,13 @@ type Cmdable interface { ShutdownNoSave(ctx context.Context) *StatusCmd SlaveOf(ctx context.Context, host, port string) *StatusCmd SlowLogGet(ctx context.Context, num int64) *SlowLogCmd + SlowLogLen(ctx context.Context) *IntCmd + SlowLogReset(ctx context.Context) *StatusCmd Time(ctx context.Context) *TimeCmd DebugObject(ctx context.Context, key string) *StringCmd MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd + Latency(ctx context.Context) *LatencyCmd + LatencyReset(ctx context.Context, events ...interface{}) *StatusCmd ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd @@ -675,6 +679,34 @@ func (c cmdable) SlowLogGet(ctx context.Context, num int64) *SlowLogCmd { return cmd } +func (c cmdable) SlowLogLen(ctx context.Context) *IntCmd { + cmd := NewIntCmd(ctx, "slowlog", "len") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) SlowLogReset(ctx context.Context) *StatusCmd { + cmd := NewStatusCmd(ctx, "slowlog", "reset") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) Latency(ctx context.Context) *LatencyCmd { + cmd := NewLatencyCmd(ctx, "latency", "latest") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) LatencyReset(ctx context.Context, events ...interface{}) *StatusCmd { + args := make([]interface{}, 2+len(events)) + args[0] = "latency" + args[1] = "reset" + copy(args[2:], events) + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) Sync(_ context.Context) { panic("not implemented") } diff --git a/commands_test.go b/commands_test.go index c72bd732..edbae4e7 100644 --- a/commands_test.go +++ b/commands_test.go @@ -1796,6 +1796,200 @@ var _ = Describe("Commands", func() { Expect(get.Err()).To(Equal(redis.Nil)) }) + It("should DelExArgs when value matches", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "lock", "token-123", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Delete only if value matches + deleted := client.DelExArgs(ctx, "lock", redis.DelExArgs{ + Mode: "IFEQ", + MatchValue: "token-123", + }) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(1))) + + // Verify key was deleted + get := client.Get(ctx, "lock") + Expect(get.Err()).To(Equal(redis.Nil)) + }) + + It("should DelExArgs fail when value does not match", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "lock", "token-123", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Try to delete with wrong value + deleted := client.DelExArgs(ctx, "lock", redis.DelExArgs{ + Mode: "IFEQ", + MatchValue: "wrong-token", + }) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(0))) + + // Verify key was NOT deleted + val, err := client.Get(ctx, "lock").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("token-123")) + }) + + It("should DelExArgs on non-existent key", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Try to delete non-existent key + deleted := client.DelExArgs(ctx, "nonexistent", redis.DelExArgs{ + Mode: "IFEQ", + MatchValue: "any-value", + }) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(0))) + }) + + It("should DelExArgs with IFEQ", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "temp-key", "temp-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Delete with IFEQ + args := redis.DelExArgs{ + Mode: "IFEQ", + MatchValue: "temp-value", + } + deleted := client.DelExArgs(ctx, "temp-key", args) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(1))) + + // Verify key was deleted + get := client.Get(ctx, "temp-key") + Expect(get.Err()).To(Equal(redis.Nil)) + }) + + It("should DelExArgs with IFNE", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "temporary", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Delete only if value is NOT "permanent" + args := redis.DelExArgs{ + Mode: "IFNE", + MatchValue: "permanent", + } + deleted := client.DelExArgs(ctx, "key", args) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(1))) + + // Verify key was deleted + get := client.Get(ctx, "key") + Expect(get.Err()).To(Equal(redis.Nil)) + }) + + It("should DelExArgs with IFNE fail when value matches", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "permanent", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Try to delete but value matches (should fail) + args := redis.DelExArgs{ + Mode: "IFNE", + MatchValue: "permanent", + } + deleted := client.DelExArgs(ctx, "key", args) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(0))) + + // Verify key was NOT deleted + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("permanent")) + }) + + It("should Digest", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set a value + err := client.Set(ctx, "my-key", "my-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest (returns uint64) + digest := client.Digest(ctx, "my-key") + Expect(digest.Err()).NotTo(HaveOccurred()) + Expect(digest.Val()).NotTo(BeZero()) + + // Digest should be consistent + digest2 := client.Digest(ctx, "my-key") + Expect(digest2.Err()).NotTo(HaveOccurred()) + Expect(digest2.Val()).To(Equal(digest.Val())) + }) + + It("should Digest on non-existent key", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Get digest of non-existent key + digest := client.Digest(ctx, "nonexistent") + Expect(digest.Err()).To(Equal(redis.Nil)) + }) + + It("should use Digest with SetArgs IFDEQ", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest + digest := client.Digest(ctx, "key") + Expect(digest.Err()).NotTo(HaveOccurred()) + + // Update using digest + args := redis.SetArgs{ + Mode: "IFDEQ", + MatchDigest: digest.Val(), + } + result := client.SetArgs(ctx, "key", "value2", args) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value2")) + }) + + It("should use Digest with DelExArgs IFDEQ", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest + digest := client.Digest(ctx, "key") + Expect(digest.Err()).NotTo(HaveOccurred()) + + // Delete using digest + args := redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: digest.Val(), + } + deleted := client.DelExArgs(ctx, "key", args) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(1))) + + // Verify key was deleted + get := client.Get(ctx, "key") + Expect(get.Err()).To(Equal(redis.Nil)) + }) + It("should Incr", func() { set := client.Set(ctx, "key", "10", 0) Expect(set.Err()).NotTo(HaveOccurred()) @@ -2474,6 +2668,320 @@ var _ = Describe("Commands", func() { Expect(ttl).NotTo(Equal(-1)) }) + It("should SetIFEQ when value matches", func() { + if RedisVersion < 8.4 { + Skip("CAS/CAD commands require Redis >= 8.4") + } + + // Set initial value + err := client.Set(ctx, "key", "old-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update only if current value is "old-value" + result := client.SetIFEQ(ctx, "key", "new-value", "old-value", 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("new-value")) + }) + + It("should SetIFEQ fail when value does not match", func() { + if RedisVersion < 8.4 { + Skip("CAS/CAD commands require Redis >= 8.4") + } + + // Set initial value + err := client.Set(ctx, "key", "current-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Try to update with wrong match value + result := client.SetIFEQ(ctx, "key", "new-value", "wrong-value", 0) + Expect(result.Err()).To(Equal(redis.Nil)) + + // Verify value was NOT updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("current-value")) + }) + + It("should SetIFEQ with expiration", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "token-123", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update with expiration + result := client.SetIFEQ(ctx, "key", "token-456", "token-123", 500*time.Millisecond) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("token-456")) + + // Wait for expiration + Eventually(func() error { + return client.Get(ctx, "key").Err() + }, "1s", "100ms").Should(Equal(redis.Nil)) + }) + + It("should SetIFNE when value does not match", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "pending", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update only if current value is NOT "completed" + result := client.SetIFNE(ctx, "key", "processing", "completed", 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("processing")) + }) + + It("should SetIFNE fail when value matches", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "completed", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Try to update but value matches (should fail) + result := client.SetIFNE(ctx, "key", "processing", "completed", 0) + Expect(result.Err()).To(Equal(redis.Nil)) + + // Verify value was NOT updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("completed")) + }) + + It("should SetArgs with IFEQ", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "counter", "100", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update with IFEQ + args := redis.SetArgs{ + Mode: "IFEQ", + MatchValue: "100", + TTL: 1 * time.Hour, + } + result := client.SetArgs(ctx, "counter", "200", args) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "counter").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("200")) + }) + + It("should SetArgs with IFEQ and GET", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "old", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update with IFEQ and GET old value + args := redis.SetArgs{ + Mode: "IFEQ", + MatchValue: "old", + Get: true, + } + result := client.SetArgs(ctx, "key", "new", args) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("old")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("new")) + }) + + It("should SetArgs with IFNE", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "status", "pending", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update with IFNE + args := redis.SetArgs{ + Mode: "IFNE", + MatchValue: "completed", + TTL: 30 * time.Minute, + } + result := client.SetArgs(ctx, "status", "processing", args) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "status").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("processing")) + }) + + It("should SetIFEQGet return previous value", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "old-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update and get previous value + result := client.SetIFEQGet(ctx, "key", "new-value", "old-value", 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("old-value")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("new-value")) + }) + + It("should SetIFNEGet return previous value", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "pending", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update and get previous value + result := client.SetIFNEGet(ctx, "key", "processing", "completed", 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("pending")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("processing")) + }) + + It("should SetIFDEQ when digest matches", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest + digest := client.Digest(ctx, "key") + Expect(digest.Err()).NotTo(HaveOccurred()) + + // Update using digest + result := client.SetIFDEQ(ctx, "key", "value2", digest.Val(), 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value2")) + }) + + It("should SetIFDEQ fail when digest does not match", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest of a different value to use as wrong digest + err = client.Set(ctx, "temp-key", "different-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + wrongDigest := client.Digest(ctx, "temp-key") + Expect(wrongDigest.Err()).NotTo(HaveOccurred()) + + // Try to update with wrong digest + result := client.SetIFDEQ(ctx, "key", "value2", wrongDigest.Val(), 0) + Expect(result.Err()).To(Equal(redis.Nil)) + + // Verify value was NOT updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value1")) + }) + + It("should SetIFDEQGet return previous value", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest + digest := client.Digest(ctx, "key") + Expect(digest.Err()).NotTo(HaveOccurred()) + + // Update using digest and get previous value + result := client.SetIFDEQGet(ctx, "key", "value2", digest.Val(), 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("value1")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value2")) + }) + + It("should SetIFDNE when digest does not match", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest of a different value + err = client.Set(ctx, "temp-key", "different-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + differentDigest := client.Digest(ctx, "temp-key") + Expect(differentDigest.Err()).NotTo(HaveOccurred()) + + // Update with different digest (should succeed because digest doesn't match) + result := client.SetIFDNE(ctx, "key", "value2", differentDigest.Val(), 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value2")) + }) + + It("should SetIFDNE fail when digest matches", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest + digest := client.Digest(ctx, "key") + Expect(digest.Err()).NotTo(HaveOccurred()) + + // Try to update but digest matches (should fail) + result := client.SetIFDNE(ctx, "key", "value2", digest.Val(), 0) + Expect(result.Err()).To(Equal(redis.Nil)) + + // Verify value was NOT updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value1")) + }) + It("should SetRange", func() { set := client.Set(ctx, "key", "Hello World", 0) Expect(set.Err()).NotTo(HaveOccurred()) @@ -8294,7 +8802,7 @@ var _ = Describe("Commands", func() { }) }) - Describe("SlowLogGet", func() { + Describe("SlowLog", func() { It("returns slow query result", func() { const key = "slowlog-log-slower-than" @@ -8311,6 +8819,114 @@ var _ = Describe("Commands", func() { Expect(err).NotTo(HaveOccurred()) Expect(len(result)).NotTo(BeZero()) }) + + It("returns the number of slow queries", Label("NonRedisEnterprise"), func() { + // Reset slowlog + err := client.SlowLogReset(ctx).Err() + Expect(err).NotTo(HaveOccurred()) + + const key = "slowlog-log-slower-than" + + old := client.ConfigGet(ctx, key).Val() + // first slowlog entry is the config set command itself + client.ConfigSet(ctx, key, "0") + defer client.ConfigSet(ctx, key, old[key]) + + // Set a key to trigger a slow query, and this is the second slowlog entry + client.Set(ctx, "test", "true", 0) + result, err := client.SlowLogLen(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).Should(Equal(int64(2))) + + // Reset slowlog + err = client.SlowLogReset(ctx).Err() + Expect(err).NotTo(HaveOccurred()) + + // Check if slowlog is empty, this is the first slowlog entry after reset + result, err = client.SlowLogLen(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).Should(Equal(int64(1))) + }) + }) + + Describe("Latency", Label("NonRedisEnterprise"), func() { + It("returns latencies", func() { + const key = "latency-monitor-threshold" + + old := client.ConfigGet(ctx, key).Val() + client.ConfigSet(ctx, key, "1") + defer client.ConfigSet(ctx, key, old[key]) + + err := client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).NotTo(BeZero()) + }) + + It("reset all latencies", func() { + const key = "latency-monitor-threshold" + + result, err := client.Latency(ctx).Result() + // reset all latencies + err = client.LatencyReset(ctx).Err() + Expect(err).NotTo(HaveOccurred()) + + old := client.ConfigGet(ctx, key).Val() + client.ConfigSet(ctx, key, "1") + defer client.ConfigSet(ctx, key, old[key]) + + // get latency after reset + result, err = client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(0)) + + // create a new latency + err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err() + Expect(err).NotTo(HaveOccurred()) + + // get latency after create a new latency + result, err = client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(1)) + + // reset all latencies again + err = client.LatencyReset(ctx).Err() + Expect(err).NotTo(HaveOccurred()) + + // get latency after reset again + result, err = client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(0)) + }) + + It("reset latencies by add event name args", func() { + const key = "latency-monitor-threshold" + + old := client.ConfigGet(ctx, key).Val() + client.ConfigSet(ctx, key, "1") + defer client.ConfigSet(ctx, key, old[key]) + + result, err := client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(0)) + + err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err = client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(1)) + + // reset latency by event name + err = client.LatencyReset(ctx, result[0].Name).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err = client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(0)) + }) }) }) diff --git a/digest_test.go b/digest_test.go new file mode 100644 index 00000000..b9d91979 --- /dev/null +++ b/digest_test.go @@ -0,0 +1,265 @@ +package redis_test + +import ( + "context" + "os" + "strconv" + "strings" + "testing" + + "github.com/redis/go-redis/v9" +) + +func init() { + // Initialize RedisVersion from environment variable for regular Go tests + // (Ginkgo tests initialize this in BeforeSuite) + if version := os.Getenv("REDIS_VERSION"); version != "" { + if v, err := strconv.ParseFloat(strings.Trim(version, "\""), 64); err == nil && v > 0 { + RedisVersion = v + } + } +} + +// skipIfRedisBelow84 checks if Redis version is below 8.4 and skips the test if so +func skipIfRedisBelow84(t *testing.T) { + if RedisVersion < 8.4 { + t.Skipf("Skipping test: Redis version %.1f < 8.4 (DIGEST command requires Redis 8.4+)", RedisVersion) + } +} + +// TestDigestBasic validates that the Digest command returns a uint64 value +func TestDigestBasic(t *testing.T) { + skipIfRedisBelow84(t) + + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client.Close() + + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + client.Del(ctx, "digest-test-key") + + // Set a value + err := client.Set(ctx, "digest-test-key", "testvalue", 0).Err() + if err != nil { + t.Fatalf("Failed to set value: %v", err) + } + + // Get digest + digestCmd := client.Digest(ctx, "digest-test-key") + if err := digestCmd.Err(); err != nil { + t.Fatalf("Failed to get digest: %v", err) + } + + digest := digestCmd.Val() + if digest == 0 { + t.Error("Digest should not be zero for non-empty value") + } + + t.Logf("Digest for 'testvalue': %d (0x%016x)", digest, digest) + + // Verify same value produces same digest + digest2 := client.Digest(ctx, "digest-test-key").Val() + if digest != digest2 { + t.Errorf("Same value should produce same digest: %d != %d", digest, digest2) + } + + client.Del(ctx, "digest-test-key") +} + +// TestSetIFDEQWithDigest validates the SetIFDEQ command works with digests +func TestSetIFDEQWithDigest(t *testing.T) { + skipIfRedisBelow84(t) + + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client.Close() + + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + client.Del(ctx, "cas-test-key") + + // Set initial value + initialValue := "initial-value" + err := client.Set(ctx, "cas-test-key", initialValue, 0).Err() + if err != nil { + t.Fatalf("Failed to set initial value: %v", err) + } + + // Get current digest + correctDigest := client.Digest(ctx, "cas-test-key").Val() + wrongDigest := uint64(12345) // arbitrary wrong digest + + // Test 1: SetIFDEQ with correct digest should succeed + result := client.SetIFDEQ(ctx, "cas-test-key", "new-value", correctDigest, 0) + if err := result.Err(); err != nil { + t.Errorf("SetIFDEQ with correct digest failed: %v", err) + } else { + t.Logf("✓ SetIFDEQ with correct digest succeeded") + } + + // Verify value was updated + val, err := client.Get(ctx, "cas-test-key").Result() + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + if val != "new-value" { + t.Errorf("Value not updated: got %q, want %q", val, "new-value") + } + + // Test 2: SetIFDEQ with wrong digest should fail + result = client.SetIFDEQ(ctx, "cas-test-key", "another-value", wrongDigest, 0) + if result.Err() != redis.Nil { + t.Errorf("SetIFDEQ with wrong digest should return redis.Nil, got: %v", result.Err()) + } else { + t.Logf("✓ SetIFDEQ with wrong digest correctly failed") + } + + // Verify value was NOT updated + val, err = client.Get(ctx, "cas-test-key").Result() + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + if val != "new-value" { + t.Errorf("Value should not have changed: got %q, want %q", val, "new-value") + } + + client.Del(ctx, "cas-test-key") +} + +// TestSetIFDNEWithDigest validates the SetIFDNE command works with digests +func TestSetIFDNEWithDigest(t *testing.T) { + skipIfRedisBelow84(t) + + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client.Close() + + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + client.Del(ctx, "cad-test-key") + + // Set initial value + initialValue := "initial-value" + err := client.Set(ctx, "cad-test-key", initialValue, 0).Err() + if err != nil { + t.Fatalf("Failed to set initial value: %v", err) + } + + // Use an arbitrary different digest + wrongDigest := uint64(99999) // arbitrary different digest + + // Test 1: SetIFDNE with different digest should succeed + result := client.SetIFDNE(ctx, "cad-test-key", "new-value", wrongDigest, 0) + if err := result.Err(); err != nil { + t.Errorf("SetIFDNE with different digest failed: %v", err) + } else { + t.Logf("✓ SetIFDNE with different digest succeeded") + } + + // Verify value was updated + val, err := client.Get(ctx, "cad-test-key").Result() + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + if val != "new-value" { + t.Errorf("Value not updated: got %q, want %q", val, "new-value") + } + + // Test 2: SetIFDNE with matching digest should fail + newDigest := client.Digest(ctx, "cad-test-key").Val() + result = client.SetIFDNE(ctx, "cad-test-key", "another-value", newDigest, 0) + if result.Err() != redis.Nil { + t.Errorf("SetIFDNE with matching digest should return redis.Nil, got: %v", result.Err()) + } else { + t.Logf("✓ SetIFDNE with matching digest correctly failed") + } + + // Verify value was NOT updated + val, err = client.Get(ctx, "cad-test-key").Result() + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + if val != "new-value" { + t.Errorf("Value should not have changed: got %q, want %q", val, "new-value") + } + + client.Del(ctx, "cad-test-key") +} + +// TestDelExArgsWithDigest validates DelExArgs works with digest matching +func TestDelExArgsWithDigest(t *testing.T) { + skipIfRedisBelow84(t) + + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client.Close() + + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + client.Del(ctx, "del-test-key") + + // Set a value + value := "delete-me" + err := client.Set(ctx, "del-test-key", value, 0).Err() + if err != nil { + t.Fatalf("Failed to set value: %v", err) + } + + // Get correct digest + correctDigest := client.Digest(ctx, "del-test-key").Val() + wrongDigest := uint64(54321) + + // Test 1: Delete with wrong digest should fail + deleted := client.DelExArgs(ctx, "del-test-key", redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: wrongDigest, + }).Val() + + if deleted != 0 { + t.Errorf("Delete with wrong digest should not delete: got %d deletions", deleted) + } else { + t.Logf("✓ DelExArgs with wrong digest correctly refused to delete") + } + + // Verify key still exists + exists := client.Exists(ctx, "del-test-key").Val() + if exists != 1 { + t.Errorf("Key should still exist after failed delete") + } + + // Test 2: Delete with correct digest should succeed + deleted = client.DelExArgs(ctx, "del-test-key", redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: correctDigest, + }).Val() + + if deleted != 1 { + t.Errorf("Delete with correct digest should delete: got %d deletions", deleted) + } else { + t.Logf("✓ DelExArgs with correct digest successfully deleted") + } + + // Verify key was deleted + exists = client.Exists(ctx, "del-test-key").Val() + if exists != 0 { + t.Errorf("Key should not exist after successful delete") + } +} + diff --git a/example/digest-optimistic-locking/README.md b/example/digest-optimistic-locking/README.md new file mode 100644 index 00000000..37fa1e8e --- /dev/null +++ b/example/digest-optimistic-locking/README.md @@ -0,0 +1,200 @@ +# Redis Digest & Optimistic Locking Example + +This example demonstrates how to use Redis DIGEST command and digest-based optimistic locking with go-redis. + +## What is Redis DIGEST? + +The DIGEST command (Redis 8.4+) returns a 64-bit xxh3 hash of a key's value. This hash can be used for: + +- **Optimistic locking**: Update values only if they haven't changed +- **Change detection**: Detect if a value was modified +- **Conditional operations**: Delete or update based on expected content + +## Features Demonstrated + +1. **Basic Digest Usage**: Get digest from Redis and verify with client-side calculation +2. **Optimistic Locking with SetIFDEQ**: Update only if digest matches (value unchanged) +3. **Change Detection with SetIFDNE**: Update only if digest differs (value changed) +4. **Conditional Delete**: Delete only if digest matches expected value +5. **Client-Side Digest Generation**: Calculate digests without fetching from Redis + +## Requirements + +- Redis 8.4+ (for DIGEST command support) +- Go 1.18+ + +## Installation + +```bash +cd example/digest-optimistic-locking +go mod tidy +``` + +## Running the Example + +```bash +# Make sure Redis 8.4+ is running on localhost:6379 +redis-server + +# In another terminal, run the example +go run . +``` + +## Expected Output + +``` +=== Redis Digest & Optimistic Locking Example === + +1. Basic Digest Usage +--------------------- +Key: user:1000:name +Value: Alice +Digest: 7234567890123456789 (0x6478a1b2c3d4e5f6) +Client-calculated digest: 7234567890123456789 (0x6478a1b2c3d4e5f6) +✓ Digests match! + +2. Optimistic Locking with SetIFDEQ +------------------------------------ +Initial value: 100 +Current digest: 0x1234567890abcdef +✓ Update successful! New value: 150 +✓ Correctly rejected update with wrong digest + +3. Detecting Changes with SetIFDNE +----------------------------------- +Initial value: v1.0.0 +Old digest: 0xabcdef1234567890 +✓ Value changed! Updated to: v2.0.0 +✓ Correctly rejected: current value matches the digest + +4. Conditional Delete with DelExArgs +------------------------------------- +Created session: session:abc123 +Expected digest: 0x9876543210fedcba +✓ Correctly refused to delete (wrong digest) +✓ Successfully deleted with correct digest +✓ Session deleted + +5. Client-Side Digest Generation +--------------------------------- +Current price: $29.99 +Expected digest (calculated client-side): 0xfedcba0987654321 +✓ Price updated successfully to $24.99 + +Binary data example: +Binary data digest: 0x1122334455667788 +✓ Binary digest matches! + +=== All examples completed successfully! === +``` + +## How It Works + +### Digest Calculation + +Redis uses the **xxh3** hashing algorithm. To calculate digests client-side, use `github.com/zeebo/xxh3`: + +```go +import "github.com/zeebo/xxh3" + +// For strings +digest := xxh3.HashString("myvalue") + +// For binary data +digest := xxh3.Hash([]byte{0x01, 0x02, 0x03}) +``` + +### Optimistic Locking Pattern + +```go +// 1. Read current value and get its digest +currentValue := rdb.Get(ctx, "key").Val() +currentDigest := rdb.Digest(ctx, "key").Val() + +// 2. Perform business logic +newValue := processValue(currentValue) + +// 3. Update only if value hasn't changed +result := rdb.SetIFDEQ(ctx, "key", newValue, currentDigest, 0) +if result.Err() == redis.Nil { + // Value was modified by another client - retry or handle conflict +} +``` + +### Client-Side Digest (No Extra Round Trip) + +```go +// If you know the expected current value, calculate digest client-side +expectedValue := "100" +expectedDigest := xxh3.HashString(expectedValue) + +// Update without fetching digest from Redis first +result := rdb.SetIFDEQ(ctx, "counter", "150", expectedDigest, 0) +``` + +## Use Cases + +### 1. Distributed Counter with Conflict Detection + +```go +// Multiple clients can safely update a counter +currentValue := rdb.Get(ctx, "counter").Val() +currentDigest := rdb.Digest(ctx, "counter").Val() + +newValue := incrementCounter(currentValue) + +// Only succeeds if no other client modified it +if rdb.SetIFDEQ(ctx, "counter", newValue, currentDigest, 0).Err() == redis.Nil { + // Retry with new value +} +``` + +### 2. Session Management + +```go +// Delete session only if it contains expected data +sessionData := "user:1234:active" +expectedDigest := xxh3.HashString(sessionData) + +deleted := rdb.DelExArgs(ctx, "session:xyz", redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: expectedDigest, +}).Val() +``` + +### 3. Configuration Updates + +```go +// Update config only if it changed +oldConfig := loadOldConfig() +oldDigest := xxh3.HashString(oldConfig) + +newConfig := loadNewConfig() + +// Only update if config actually changed +result := rdb.SetIFDNE(ctx, "config", newConfig, oldDigest, 0) +if result.Err() != redis.Nil { + fmt.Println("Config updated!") +} +``` + +## Advantages Over WATCH/MULTI/EXEC + +- **Simpler**: Single command instead of transaction +- **Faster**: No transaction overhead +- **Client-side digest**: Can calculate expected digest without fetching from Redis +- **Works with any command**: Not limited to transactions + +## Learn More + +- [Redis DIGEST command](https://redis.io/commands/digest/) +- [Redis SET command with IFDEQ/IFDNE](https://redis.io/commands/set/) +- [xxh3 hashing algorithm](https://github.com/Cyan4973/xxHash) +- [github.com/zeebo/xxh3](https://github.com/zeebo/xxh3) + +## Comparison: XXH3 vs XXH64 + +**Note**: Redis uses **XXH3**, not XXH64. If you have `github.com/cespare/xxhash/v2` in your project, it implements XXH64 which produces **different hash values**. You must use `github.com/zeebo/xxh3` for Redis DIGEST operations. + +See [XXHASH_LIBRARY_COMPARISON.md](../../XXHASH_LIBRARY_COMPARISON.md) for detailed comparison. + diff --git a/example/digest-optimistic-locking/go.mod b/example/digest-optimistic-locking/go.mod new file mode 100644 index 00000000..d27d9202 --- /dev/null +++ b/example/digest-optimistic-locking/go.mod @@ -0,0 +1,16 @@ +module github.com/redis/go-redis/example/digest-optimistic-locking + +go 1.18 + +replace github.com/redis/go-redis/v9 => ../.. + +require ( + github.com/redis/go-redis/v9 v9.16.0 + github.com/zeebo/xxh3 v1.0.2 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect +) diff --git a/example/digest-optimistic-locking/go.sum b/example/digest-optimistic-locking/go.sum new file mode 100644 index 00000000..1efe9a30 --- /dev/null +++ b/example/digest-optimistic-locking/go.sum @@ -0,0 +1,11 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= diff --git a/example/digest-optimistic-locking/main.go b/example/digest-optimistic-locking/main.go new file mode 100644 index 00000000..2b380fc1 --- /dev/null +++ b/example/digest-optimistic-locking/main.go @@ -0,0 +1,245 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/redis/go-redis/v9" + "github.com/zeebo/xxh3" +) + +func main() { + ctx := context.Background() + + // Connect to Redis + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer rdb.Close() + + // Ping to verify connection + if err := rdb.Ping(ctx).Err(); err != nil { + fmt.Printf("Failed to connect to Redis: %v\n", err) + return + } + + fmt.Println("=== Redis Digest & Optimistic Locking Example ===") + fmt.Println() + + // Example 1: Basic Digest Usage + fmt.Println("1. Basic Digest Usage") + fmt.Println("---------------------") + basicDigestExample(ctx, rdb) + fmt.Println() + + // Example 2: Optimistic Locking with SetIFDEQ + fmt.Println("2. Optimistic Locking with SetIFDEQ") + fmt.Println("------------------------------------") + optimisticLockingExample(ctx, rdb) + fmt.Println() + + // Example 3: Detecting Changes with SetIFDNE + fmt.Println("3. Detecting Changes with SetIFDNE") + fmt.Println("-----------------------------------") + detectChangesExample(ctx, rdb) + fmt.Println() + + // Example 4: Conditional Delete with DelExArgs + fmt.Println("4. Conditional Delete with DelExArgs") + fmt.Println("-------------------------------------") + conditionalDeleteExample(ctx, rdb) + fmt.Println() + + // Example 5: Client-Side Digest Generation + fmt.Println("5. Client-Side Digest Generation") + fmt.Println("---------------------------------") + clientSideDigestExample(ctx, rdb) + fmt.Println() + + fmt.Println("=== All examples completed successfully! ===") +} + +// basicDigestExample demonstrates getting a digest from Redis +func basicDigestExample(ctx context.Context, rdb *redis.Client) { + // Set a value + key := "user:1000:name" + value := "Alice" + rdb.Set(ctx, key, value, 0) + + // Get the digest + digest := rdb.Digest(ctx, key).Val() + + fmt.Printf("Key: %s\n", key) + fmt.Printf("Value: %s\n", value) + fmt.Printf("Digest: %d (0x%016x)\n", digest, digest) + + // Verify with client-side calculation + clientDigest := xxh3.HashString(value) + fmt.Printf("Client-calculated digest: %d (0x%016x)\n", clientDigest, clientDigest) + + if digest == clientDigest { + fmt.Println("✓ Digests match!") + } +} + +// optimisticLockingExample demonstrates using SetIFDEQ for optimistic locking +func optimisticLockingExample(ctx context.Context, rdb *redis.Client) { + key := "counter" + + // Initial value + rdb.Set(ctx, key, "100", 0) + fmt.Printf("Initial value: %s\n", rdb.Get(ctx, key).Val()) + + // Get current digest + currentDigest := rdb.Digest(ctx, key).Val() + fmt.Printf("Current digest: 0x%016x\n", currentDigest) + + // Simulate some processing time + time.Sleep(100 * time.Millisecond) + + // Try to update only if value hasn't changed (digest matches) + newValue := "150" + result := rdb.SetIFDEQ(ctx, key, newValue, currentDigest, 0) + + if result.Err() == redis.Nil { + fmt.Println("✗ Update failed: value was modified by another client") + } else if result.Err() != nil { + fmt.Printf("✗ Error: %v\n", result.Err()) + } else { + fmt.Printf("✓ Update successful! New value: %s\n", rdb.Get(ctx, key).Val()) + } + + // Try again with wrong digest (simulating concurrent modification) + wrongDigest := uint64(12345) + result = rdb.SetIFDEQ(ctx, key, "200", wrongDigest, 0) + + if result.Err() == redis.Nil { + fmt.Println("✓ Correctly rejected update with wrong digest") + } +} + +// detectChangesExample demonstrates using SetIFDNE to detect if a value changed +func detectChangesExample(ctx context.Context, rdb *redis.Client) { + key := "config:version" + + // Set initial value + oldValue := "v1.0.0" + rdb.Set(ctx, key, oldValue, 0) + fmt.Printf("Initial value: %s\n", oldValue) + + // Calculate digest of a DIFFERENT value (what we expect it NOT to be) + unwantedValue := "v0.9.0" + unwantedDigest := xxh3.HashString(unwantedValue) + fmt.Printf("Unwanted value digest: 0x%016x\n", unwantedDigest) + + // Update to new value only if current value is NOT the unwanted value + // (i.e., only if digest does NOT match unwantedDigest) + newValue := "v2.0.0" + result := rdb.SetIFDNE(ctx, key, newValue, unwantedDigest, 0) + + if result.Err() == redis.Nil { + fmt.Println("✗ Current value matches unwanted value (digest matches)") + } else if result.Err() != nil { + fmt.Printf("✗ Error: %v\n", result.Err()) + } else { + fmt.Printf("✓ Current value is different from unwanted value! Updated to: %s\n", rdb.Get(ctx, key).Val()) + } + + // Try to update again, but this time the digest matches current value (should fail) + currentDigest := rdb.Digest(ctx, key).Val() + result = rdb.SetIFDNE(ctx, key, "v3.0.0", currentDigest, 0) + + if result.Err() == redis.Nil { + fmt.Println("✓ Correctly rejected: current value matches the digest (IFDNE failed)") + } +} + +// conditionalDeleteExample demonstrates using DelExArgs with digest +func conditionalDeleteExample(ctx context.Context, rdb *redis.Client) { + key := "session:abc123" + value := "user_data_here" + + // Set a value + rdb.Set(ctx, key, value, 0) + fmt.Printf("Created session: %s\n", key) + + // Calculate expected digest + expectedDigest := xxh3.HashString(value) + fmt.Printf("Expected digest: 0x%016x\n", expectedDigest) + + // Try to delete with wrong digest (should fail) + wrongDigest := uint64(99999) + deleted := rdb.DelExArgs(ctx, key, redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: wrongDigest, + }).Val() + + if deleted == 0 { + fmt.Println("✓ Correctly refused to delete (wrong digest)") + } + + // Delete with correct digest (should succeed) + deleted = rdb.DelExArgs(ctx, key, redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: expectedDigest, + }).Val() + + if deleted == 1 { + fmt.Println("✓ Successfully deleted with correct digest") + } + + // Verify deletion + exists := rdb.Exists(ctx, key).Val() + if exists == 0 { + fmt.Println("✓ Session deleted") + } +} + +// clientSideDigestExample demonstrates calculating digests without fetching from Redis +func clientSideDigestExample(ctx context.Context, rdb *redis.Client) { + key := "product:1001:price" + + // Scenario: We know the expected current value + expectedCurrentValue := "29.99" + newValue := "24.99" + + // Set initial value + rdb.Set(ctx, key, expectedCurrentValue, 0) + fmt.Printf("Current price: $%s\n", expectedCurrentValue) + + // Calculate digest client-side (no need to fetch from Redis!) + expectedDigest := xxh3.HashString(expectedCurrentValue) + fmt.Printf("Expected digest (calculated client-side): 0x%016x\n", expectedDigest) + + // Update price only if it matches our expectation + result := rdb.SetIFDEQ(ctx, key, newValue, expectedDigest, 0) + + if result.Err() == redis.Nil { + fmt.Println("✗ Price was already changed by someone else") + actualValue := rdb.Get(ctx, key).Val() + fmt.Printf(" Actual current price: $%s\n", actualValue) + } else if result.Err() != nil { + fmt.Printf("✗ Error: %v\n", result.Err()) + } else { + fmt.Printf("✓ Price updated successfully to $%s\n", newValue) + } + + // Demonstrate with binary data + fmt.Println("\nBinary data example:") + binaryKey := "image:thumbnail" + binaryData := []byte{0xFF, 0xD8, 0xFF, 0xE0} // JPEG header + + rdb.Set(ctx, binaryKey, binaryData, 0) + + // Calculate digest for binary data + binaryDigest := xxh3.Hash(binaryData) + fmt.Printf("Binary data digest: 0x%016x\n", binaryDigest) + + // Verify it matches Redis + redisDigest := rdb.Digest(ctx, binaryKey).Val() + if binaryDigest == redisDigest { + fmt.Println("✓ Binary digest matches!") + } +} + diff --git a/internal/auth/streaming/pool_hook.go b/internal/auth/streaming/pool_hook.go index f37fe557..1af2bf23 100644 --- a/internal/auth/streaming/pool_hook.go +++ b/internal/auth/streaming/pool_hook.go @@ -190,7 +190,8 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool, return } - _, err := stateMachine.AwaitAndTransition(ctx, []pool.ConnState{pool.StateIdle}, pool.StateUnusable) + // Use predefined slice to avoid allocation + _, err := stateMachine.AwaitAndTransition(ctx, pool.ValidFromIdle(), pool.StateUnusable) if err != nil { // Timeout or other error occurred, cannot acquire connection reAuthFn(err) diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 32ac88c2..5311d735 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -18,6 +18,15 @@ import ( var noDeadline = time.Time{} +// Preallocated errors for hot paths to avoid allocations +var ( + errAlreadyMarkedForHandoff = errors.New("connection is already marked for handoff") + errNotMarkedForHandoff = errors.New("connection was not marked for handoff") + errHandoffStateChanged = errors.New("handoff state changed during marking") + errConnectionNotAvailable = errors.New("redis: connection not available") + errConnNotAvailableForWrite = errors.New("redis: connection not available for write operation") +) + // Global time cache updated every 50ms by background goroutine. // This avoids expensive time.Now() syscalls in hot paths like getEffectiveReadTimeout. // Max staleness: 50ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds). @@ -228,16 +237,18 @@ func (cn *Conn) CompareAndSwapUsable(old, new bool) bool { if new { // Trying to make usable - transition from UNUSABLE to IDLE // This should only work from UNUSABLE or INITIALIZING states + // Use predefined slice to avoid allocation _, err := cn.stateMachine.TryTransition( - []ConnState{StateInitializing, StateUnusable}, + validFromInitializingOrUnusable, StateIdle, ) return err == nil } else { // Trying to make unusable - transition from IDLE to UNUSABLE // This is typically for acquiring the connection for background operations + // Use predefined slice to avoid allocation _, err := cn.stateMachine.TryTransition( - []ConnState{StateIdle}, + validFromIdle, StateUnusable, ) return err == nil @@ -610,9 +621,10 @@ func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) err } waitCtx, cancel := context.WithDeadline(ctx, deadline) defer cancel() + // Use predefined slice to avoid allocation finalState, err := cn.stateMachine.AwaitAndTransition( waitCtx, - []ConnState{StateCreated, StateIdle, StateUnusable}, + validFromCreatedIdleOrUnusable, StateInitializing, ) if err != nil { @@ -643,7 +655,7 @@ func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) err func (cn *Conn) MarkForHandoff(newEndpoint string, seqID int64) error { // Check if already marked for handoff if cn.ShouldHandoff() { - return errors.New("connection is already marked for handoff") + return errAlreadyMarkedForHandoff } // Set handoff metadata atomically @@ -663,12 +675,12 @@ func (cn *Conn) MarkQueuedForHandoff() error { // Get current handoff state currentState := cn.handoffStateAtomic.Load() if currentState == nil { - return errors.New("connection was not marked for handoff") + return errNotMarkedForHandoff } state := currentState.(*HandoffState) if !state.ShouldHandoff { - return errors.New("connection was not marked for handoff") + return errNotMarkedForHandoff } // Create new state with ShouldHandoff=false but preserve endpoint and seqID @@ -683,7 +695,7 @@ func (cn *Conn) MarkQueuedForHandoff() error { // Atomic compare-and-swap to update state if !cn.handoffStateAtomic.CompareAndSwap(currentState, newState) { // State changed between load and CAS - retry or return error - return errors.New("handoff state changed during marking") + return errHandoffStateChanged } // Transition to UNUSABLE from IN_USE (normal flow), IDLE (edge cases), or CREATED (tests/uninitialized) @@ -822,7 +834,7 @@ func (cn *Conn) WithReader( // Get the connection directly from atomic storage netConn := cn.getNetConn() if netConn == nil { - return fmt.Errorf("redis: connection not available") + return errConnectionNotAvailable } if err := netConn.SetReadDeadline(cn.deadline(ctx, effectiveTimeout)); err != nil { @@ -845,8 +857,8 @@ func (cn *Conn) WithWriter( return err } } else { - // Connection is not available - return error - return fmt.Errorf("redis: conn[%d] not available for write operation", cn.GetID()) + // Connection is not available - return preallocated error + return errConnNotAvailableForWrite } } diff --git a/internal/pool/conn_state.go b/internal/pool/conn_state.go index 32fc5058..a3c3a57f 100644 --- a/internal/pool/conn_state.go +++ b/internal/pool/conn_state.go @@ -46,8 +46,28 @@ var ( validFromInUse = []ConnState{StateInUse} validFromCreatedOrIdle = []ConnState{StateCreated, StateIdle} validFromCreatedInUseOrIdle = []ConnState{StateCreated, StateInUse, StateIdle} + // For AwaitAndTransition calls + validFromCreatedIdleOrUnusable = []ConnState{StateCreated, StateIdle, StateUnusable} + validFromIdle = []ConnState{StateIdle} + // For CompareAndSwapUsable + validFromInitializingOrUnusable = []ConnState{StateInitializing, StateUnusable} ) +// Accessor functions for predefined slices to avoid allocations in external packages +// These return the same slice instance, so they're zero-allocation + +// ValidFromIdle returns a predefined slice containing only StateIdle. +// Use this to avoid allocations when calling AwaitAndTransition or TryTransition. +func ValidFromIdle() []ConnState { + return validFromIdle +} + +// ValidFromCreatedIdleOrUnusable returns a predefined slice for initialization transitions. +// Use this to avoid allocations when calling AwaitAndTransition or TryTransition. +func ValidFromCreatedIdleOrUnusable() []ConnState { + return validFromCreatedIdleOrUnusable +} + // String returns a human-readable string representation of the state. func (s ConnState) String() string { switch s { @@ -300,7 +320,8 @@ func (sm *ConnStateMachine) notifyWaiters() { processed = true break } else { - // State changed - re-add waiter to front of queue and retry + // State changed - re-add waiter to front of queue to maintain FIFO ordering + // This waiter was first in line and should retain priority sm.waiters.PushFront(w) sm.waiterCount.Add(1) // Continue to next iteration to re-read state diff --git a/internal/pool/conn_state_alloc_test.go b/internal/pool/conn_state_alloc_test.go new file mode 100644 index 00000000..071e4b79 --- /dev/null +++ b/internal/pool/conn_state_alloc_test.go @@ -0,0 +1,169 @@ +package pool + +import ( + "context" + "testing" +) + +// TestPredefinedSlicesAvoidAllocations verifies that using predefined slices +// avoids allocations in AwaitAndTransition calls +func TestPredefinedSlicesAvoidAllocations(t *testing.T) { + sm := NewConnStateMachine() + sm.Transition(StateIdle) + ctx := context.Background() + + // Test with predefined slice - should have 0 allocations on fast path + allocs := testing.AllocsPerRun(100, func() { + _, _ = sm.AwaitAndTransition(ctx, validFromIdle, StateUnusable) + sm.Transition(StateIdle) + }) + + if allocs > 0 { + t.Errorf("Expected 0 allocations with predefined slice, got %.2f", allocs) + } +} + +// TestInlineSliceAllocations shows that inline slices cause allocations +func TestInlineSliceAllocations(t *testing.T) { + sm := NewConnStateMachine() + sm.Transition(StateIdle) + ctx := context.Background() + + // Test with inline slice - will allocate + allocs := testing.AllocsPerRun(100, func() { + _, _ = sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateUnusable) + sm.Transition(StateIdle) + }) + + if allocs == 0 { + t.Logf("Inline slice had 0 allocations (compiler optimization)") + } else { + t.Logf("Inline slice caused %.2f allocations per run (expected)", allocs) + } +} + +// BenchmarkAwaitAndTransition_PredefinedSlice benchmarks with predefined slice +func BenchmarkAwaitAndTransition_PredefinedSlice(b *testing.B) { + sm := NewConnStateMachine() + sm.Transition(StateIdle) + ctx := context.Background() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + _, _ = sm.AwaitAndTransition(ctx, validFromIdle, StateUnusable) + sm.Transition(StateIdle) + } +} + +// BenchmarkAwaitAndTransition_InlineSlice benchmarks with inline slice +func BenchmarkAwaitAndTransition_InlineSlice(b *testing.B) { + sm := NewConnStateMachine() + sm.Transition(StateIdle) + ctx := context.Background() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + _, _ = sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateUnusable) + sm.Transition(StateIdle) + } +} + +// BenchmarkAwaitAndTransition_MultipleStates_Predefined benchmarks with predefined multi-state slice +func BenchmarkAwaitAndTransition_MultipleStates_Predefined(b *testing.B) { + sm := NewConnStateMachine() + sm.Transition(StateIdle) + ctx := context.Background() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + _, _ = sm.AwaitAndTransition(ctx, validFromCreatedIdleOrUnusable, StateInitializing) + sm.Transition(StateIdle) + } +} + +// BenchmarkAwaitAndTransition_MultipleStates_Inline benchmarks with inline multi-state slice +func BenchmarkAwaitAndTransition_MultipleStates_Inline(b *testing.B) { + sm := NewConnStateMachine() + sm.Transition(StateIdle) + ctx := context.Background() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + _, _ = sm.AwaitAndTransition(ctx, []ConnState{StateCreated, StateIdle, StateUnusable}, StateInitializing) + sm.Transition(StateIdle) + } +} + +// TestPreallocatedErrorsAvoidAllocations verifies that preallocated errors +// avoid allocations in hot paths +func TestPreallocatedErrorsAvoidAllocations(t *testing.T) { + cn := NewConn(nil) + + // Test MarkForHandoff - first call should succeed + err := cn.MarkForHandoff("localhost:6379", 123) + if err != nil { + t.Fatalf("First MarkForHandoff should succeed: %v", err) + } + + // Second call should return preallocated error with 0 allocations + allocs := testing.AllocsPerRun(100, func() { + _ = cn.MarkForHandoff("localhost:6380", 124) + }) + + if allocs > 0 { + t.Errorf("Expected 0 allocations for preallocated error, got %.2f", allocs) + } +} + +// BenchmarkHandoffErrors_Preallocated benchmarks handoff errors with preallocated errors +func BenchmarkHandoffErrors_Preallocated(b *testing.B) { + cn := NewConn(nil) + cn.MarkForHandoff("localhost:6379", 123) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + _ = cn.MarkForHandoff("localhost:6380", 124) + } +} + +// BenchmarkCompareAndSwapUsable_Preallocated benchmarks with preallocated slices +func BenchmarkCompareAndSwapUsable_Preallocated(b *testing.B) { + cn := NewConn(nil) + cn.stateMachine.Transition(StateIdle) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + cn.CompareAndSwapUsable(true, false) // IDLE -> UNUSABLE + cn.CompareAndSwapUsable(false, true) // UNUSABLE -> IDLE + } +} + +// TestAllTryTransitionUsePredefinedSlices verifies all TryTransition calls use predefined slices +func TestAllTryTransitionUsePredefinedSlices(t *testing.T) { + cn := NewConn(nil) + cn.stateMachine.Transition(StateIdle) + + // Test CompareAndSwapUsable - should have minimal allocations + allocs := testing.AllocsPerRun(100, func() { + cn.CompareAndSwapUsable(true, false) // IDLE -> UNUSABLE + cn.CompareAndSwapUsable(false, true) // UNUSABLE -> IDLE + }) + + // Allow some allocations for error objects, but should be minimal + if allocs > 2 { + t.Errorf("Expected <= 2 allocations with predefined slices, got %.2f", allocs) + } +} + diff --git a/internal/pool/conn_state_test.go b/internal/pool/conn_state_test.go index 92be0dc5..d1825615 100644 --- a/internal/pool/conn_state_test.go +++ b/internal/pool/conn_state_test.go @@ -401,32 +401,16 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { var orderMu sync.Mutex var wg sync.WaitGroup - // Use channels to ensure deterministic queueing order - // Each goroutine waits for the previous one to queue before it queues - queuedChannels := make([]chan struct{}, numGoroutines) - for i := 0; i < numGoroutines; i++ { - queuedChannels[i] = make(chan struct{}) - } - - // Launch goroutines that will all wait + // Launch goroutines one at a time, ensuring each is queued before launching the next for i := 0; i < numGoroutines; i++ { wg.Add(1) + expectedWaiters := int32(i + 1) + go func(id int) { defer wg.Done() - // Wait for previous goroutine to queue (except for goroutine 0) - if id > 0 { - <-queuedChannels[id-1] - } - - // Small delay to ensure the previous goroutine's AwaitAndTransition has been called - time.Sleep(5 * time.Millisecond) - ctx := context.Background() - // Signal that we're about to queue - close(queuedChannels[id]) - // This should queue in FIFO order _, err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing) if err != nil { @@ -441,16 +425,30 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { t.Logf("Goroutine %d: executed (position %d)", id, len(executionOrder)) - // Transition back to READY to allow next waiter + // Transition back to IDLE to allow next waiter sm.Transition(StateIdle) }(i) + + // Wait until this goroutine has been queued before launching the next + // Poll the waiter count to ensure the goroutine is actually queued + timeout := time.After(100 * time.Millisecond) + for { + if sm.waiterCount.Load() >= expectedWaiters { + break + } + select { + case <-timeout: + t.Fatalf("Timeout waiting for goroutine %d to queue", i) + case <-time.After(1 * time.Millisecond): + // Continue polling + } + } } - // Wait for all goroutines to queue up - <-queuedChannels[numGoroutines-1] - time.Sleep(50 * time.Millisecond) + // Give all goroutines time to fully settle in the queue + time.Sleep(10 * time.Millisecond) - // Transition to READY to start processing the queue + // Transition to IDLE to start processing the queue sm.Transition(StateIdle) wg.Wait() diff --git a/internal/semaphore.go b/internal/semaphore.go index 091b6635..df30d0a9 100644 --- a/internal/semaphore.go +++ b/internal/semaphore.go @@ -15,147 +15,318 @@ var semTimers = sync.Pool{ }, } -// FastSemaphore is a counting semaphore implementation using atomic operations. +// waiter represents a goroutine waiting for a token. +type waiter struct { + ready chan struct{} + next *waiter + cancelled atomic.Bool // Set to true if this waiter was cancelled/timed out + notified atomic.Bool // Set to true when Release() notifies this waiter +} + +// FastSemaphore is a counting semaphore implementation using a hybrid approach. // It's optimized for the fast path (no blocking) while still supporting timeouts and context cancellation. // +// This implementation uses a buffered channel for the fast path (TryAcquire/Release without waiters) +// and a FIFO queue for waiters to ensure fairness. +// // Performance characteristics: -// - Fast path (no blocking): Single atomic CAS operation -// - Slow path (blocking): Falls back to channel-based waiting -// - Release: Single atomic decrement + optional channel notification +// - Fast path (no blocking): Single channel operation (very fast) +// - Slow path (blocking): FIFO queue-based waiting +// - Release: Channel send or wake up first waiter in queue // // This is significantly faster than a pure channel-based semaphore because: -// 1. The fast path avoids channel operations entirely (no scheduler involvement) -// 2. Atomic operations are much cheaper than channel send/receive +// 1. The fast path uses a buffered channel (single atomic operation) +// 2. FIFO ordering prevents starvation for waiters +// 3. Waiters don't compete with TryAcquire callers type FastSemaphore struct { - // Current number of acquired tokens (atomic) - count atomic.Int32 + // Buffered channel for fast path (TryAcquire/Release) + tokens chan struct{} // Maximum number of tokens (capacity) max int32 - // Channel for blocking waiters - // Only used when fast path fails (semaphore is full) - waitCh chan struct{} + // Mutex to protect the waiter queue + lock sync.Mutex + + // Head and tail of the waiter queue (FIFO) + head *waiter + tail *waiter } // NewFastSemaphore creates a new fast semaphore with the given capacity. func NewFastSemaphore(capacity int32) *FastSemaphore { + ch := make(chan struct{}, capacity) + // Fill the channel with tokens (available slots) + for i := int32(0); i < capacity; i++ { + ch <- struct{}{} + } return &FastSemaphore{ max: capacity, - waitCh: make(chan struct{}, capacity), + tokens: ch, } } // TryAcquire attempts to acquire a token without blocking. // Returns true if successful, false if the semaphore is full. // -// This is the fast path - just a single CAS operation. +// This is the fast path - just a single channel operation. func (s *FastSemaphore) TryAcquire() bool { - for { - current := s.count.Load() - if current >= s.max { - return false // Semaphore is full - } - if s.count.CompareAndSwap(current, current+1) { - return true // Successfully acquired - } - // CAS failed due to concurrent modification, retry + select { + case <-s.tokens: + return true + default: + return false + } +} + +// enqueue adds a waiter to the end of the queue. +// Must be called with lock held. +func (s *FastSemaphore) enqueue(w *waiter) { + if s.tail == nil { + s.head = w + s.tail = w + } else { + s.tail.next = w + s.tail = w + } +} + +// dequeue removes and returns the first waiter from the queue. +// Must be called with lock held. +// Returns nil if the queue is empty. +func (s *FastSemaphore) dequeue() *waiter { + if s.head == nil { + return nil + } + w := s.head + s.head = w.next + if s.head == nil { + s.tail = nil + } + w.next = nil + return w +} + +// notifyOne wakes up the first waiter in the queue if any. +func (s *FastSemaphore) notifyOne() { + s.lock.Lock() + w := s.dequeue() + s.lock.Unlock() + + if w != nil { + close(w.ready) } } // Acquire acquires a token, blocking if necessary until one is available or the context is cancelled. // Returns an error if the context is cancelled or the timeout expires. // Returns timeoutErr when the timeout expires. -// -// Performance optimization: -// 1. First try fast path (no blocking) -// 2. If that fails, fall back to channel-based waiting func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, timeoutErr error) error { - // Fast path: try to acquire without blocking + // Check context first select { case <-ctx.Done(): return ctx.Err() default: } - // Try fast acquire first - if s.TryAcquire() { + // Try fast path first (non-blocking channel receive) + select { + case <-s.tokens: return nil + default: + // Channel is empty, need to wait } - // Fast path failed, need to wait + // Need to wait - create a waiter and add to queue + w := &waiter{ + ready: make(chan struct{}), + } + + s.lock.Lock() + s.enqueue(w) + s.lock.Unlock() + // Use timer pool to avoid allocation timer := semTimers.Get().(*time.Timer) defer semTimers.Put(timer) timer.Reset(timeout) - start := time.Now() - - for { - select { - case <-ctx.Done(): - if !timer.Stop() { - <-timer.C - } - return ctx.Err() - - case <-s.waitCh: - // Someone released a token, try to acquire it - if s.TryAcquire() { - if !timer.Stop() { - <-timer.C - } - return nil - } - // Failed to acquire (race with another goroutine), continue waiting - - case <-timer.C: - return timeoutErr + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C } + // Mark as cancelled and try to claim ourselves + w.cancelled.Store(true) + if w.notified.CompareAndSwap(false, true) { + // We successfully claimed ourselves, we're cancelling + // Try to remove from queue + s.lock.Lock() + removed := s.removeWaiter(w) + s.lock.Unlock() - // Periodically check if we can acquire (handles race conditions) - if time.Since(start) > timeout { + if !removed { + // Already dequeued, wait for ready to be closed + <-w.ready + } + // We claimed it, so no token was given to us + return ctx.Err() + } else { + // Release() already claimed us and is giving us a token + // Wait for the notification and then release the token + <-w.ready + s.releaseToPool() + return ctx.Err() + } + case <-w.ready: + // We were notified and got the token + // Stop the timer and drain it if it already fired + if !timer.Stop() { + <-timer.C + } + // We have the token, just return + return nil + case <-timer.C: + // Mark as cancelled and try to claim ourselves + w.cancelled.Store(true) + if w.notified.CompareAndSwap(false, true) { + // We successfully claimed ourselves, we're cancelling + // Try to remove from queue + s.lock.Lock() + removed := s.removeWaiter(w) + s.lock.Unlock() + + if !removed { + // Already dequeued, wait for ready to be closed + <-w.ready + } + // We claimed it, so no token was given to us + return timeoutErr + } else { + // Release() already claimed us and is giving us a token + // Wait for the notification and then release the token + <-w.ready + s.releaseToPool() return timeoutErr } } } +// removeWaiter removes a waiter from the queue. +// Must be called with lock held. +// Returns true if the waiter was found and removed, false otherwise. +func (s *FastSemaphore) removeWaiter(target *waiter) bool { + if s.head == nil { + return false + } + + // Special case: removing head + if s.head == target { + s.head = target.next + if s.head == nil { + s.tail = nil + } + return true + } + + // Find and remove from middle or tail + prev := s.head + for prev.next != nil { + if prev.next == target { + prev.next = target.next + if prev.next == nil { + s.tail = prev + } + return true + } + prev = prev.next + } + return false +} + // AcquireBlocking acquires a token, blocking indefinitely until one is available. // This is useful for cases where you don't need timeout or context cancellation. // Returns immediately if a token is available (fast path). func (s *FastSemaphore) AcquireBlocking() { - // Try fast path first - if s.TryAcquire() { + // Try fast path first (non-blocking channel receive) + select { + case <-s.tokens: return + default: + // Channel is empty, need to wait } - // Slow path: wait for a token - for { - <-s.waitCh - if s.TryAcquire() { - return - } - // Failed to acquire (race with another goroutine), continue waiting + // Need to wait - create a waiter and add to queue + w := &waiter{ + ready: make(chan struct{}), + } + + s.lock.Lock() + s.enqueue(w) + s.lock.Unlock() + + // Wait to be notified + <-w.ready +} + +// releaseToPool releases a token back to the pool. +// This should be called when a waiter was notified but then cancelled/timed out. +// We need to pass the token to another waiter if any, otherwise put it back in the channel. +func (s *FastSemaphore) releaseToPool() { + s.lock.Lock() + w := s.dequeue() + s.lock.Unlock() + + if w != nil { + // Transfer the token to another waiter + close(w.ready) + } else { + // No waiters, put the token back in the channel + s.tokens <- struct{}{} } } // Release releases a token back to the semaphore. -// This wakes up one waiting goroutine if any are blocked. +// This wakes up the first waiting goroutine if any are blocked. func (s *FastSemaphore) Release() { - s.count.Add(-1) + // Try to give the token to a waiter first + for { + s.lock.Lock() + w := s.dequeue() + s.lock.Unlock() - // Try to wake up a waiter (non-blocking) - // If no one is waiting, this is a no-op - select { - case s.waitCh <- struct{}{}: - // Successfully notified a waiter - default: - // No waiters, that's fine + if w == nil { + // No waiters, put the token back in the channel + s.tokens <- struct{}{} + return + } + + // Check if this waiter was cancelled before we notify them + if w.cancelled.Load() { + // This waiter was cancelled, skip them and try the next one + // We still have the token, so continue the loop + close(w.ready) // Still need to close to unblock them + continue + } + + // Try to claim this waiter by setting notified flag + // If the waiter is being cancelled concurrently, one of us will win + if !w.notified.CompareAndSwap(false, true) { + // Someone else (the waiter itself) already claimed it + // This means the waiter is cancelling, skip to next + close(w.ready) // Still need to close to unblock them + continue + } + + // We successfully claimed the waiter, transfer the token + close(w.ready) + return } } // Len returns the current number of acquired tokens. // Used by tests to check semaphore state. func (s *FastSemaphore) Len() int32 { - return s.count.Load() + // Number of acquired tokens = max - available tokens in channel + return s.max - int32(len(s.tokens)) } diff --git a/internal/semaphore_bench_test.go b/internal/semaphore_bench_test.go new file mode 100644 index 00000000..1615ca7e --- /dev/null +++ b/internal/semaphore_bench_test.go @@ -0,0 +1,245 @@ +package internal + +import ( + "context" + "sync" + "testing" + "time" +) + +// channelSemaphore is a simple semaphore using a buffered channel +type channelSemaphore struct { + ch chan struct{} +} + +func newChannelSemaphore(capacity int) *channelSemaphore { + return &channelSemaphore{ + ch: make(chan struct{}, capacity), + } +} + +func (s *channelSemaphore) TryAcquire() bool { + select { + case s.ch <- struct{}{}: + return true + default: + return false + } +} + +func (s *channelSemaphore) Acquire(ctx context.Context, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case s.ch <- struct{}{}: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return context.DeadlineExceeded + } +} + +func (s *channelSemaphore) AcquireBlocking() { + s.ch <- struct{}{} +} + +func (s *channelSemaphore) Release() { + <-s.ch +} + +// Benchmarks for FastSemaphore + +func BenchmarkFastSemaphore_TryAcquire(b *testing.B) { + sem := NewFastSemaphore(100) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if sem.TryAcquire() { + sem.Release() + } + } + }) +} + +func BenchmarkFastSemaphore_AcquireRelease(b *testing.B) { + sem := NewFastSemaphore(100) + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }) +} + +func BenchmarkFastSemaphore_Contention(b *testing.B) { + sem := NewFastSemaphore(10) // Small capacity to create contention + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }) +} + +func BenchmarkFastSemaphore_HighContention(b *testing.B) { + sem := NewFastSemaphore(1) // Very high contention + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }) +} + +// Benchmarks for channelSemaphore + +func BenchmarkChannelSemaphore_TryAcquire(b *testing.B) { + sem := newChannelSemaphore(100) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if sem.TryAcquire() { + sem.Release() + } + } + }) +} + +func BenchmarkChannelSemaphore_AcquireRelease(b *testing.B) { + sem := newChannelSemaphore(100) + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second) + sem.Release() + } + }) +} + +func BenchmarkChannelSemaphore_Contention(b *testing.B) { + sem := newChannelSemaphore(10) // Small capacity to create contention + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second) + sem.Release() + } + }) +} + +func BenchmarkChannelSemaphore_HighContention(b *testing.B) { + sem := newChannelSemaphore(1) // Very high contention + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second) + sem.Release() + } + }) +} + +// Benchmark with realistic workload (some work between acquire/release) + +func BenchmarkFastSemaphore_WithWork(b *testing.B) { + sem := NewFastSemaphore(10) + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + // Simulate some work + _ = make([]byte, 64) + sem.Release() + } + }) +} + +func BenchmarkChannelSemaphore_WithWork(b *testing.B) { + sem := newChannelSemaphore(10) + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second) + // Simulate some work + _ = make([]byte, 64) + sem.Release() + } + }) +} + +// Benchmark mixed TryAcquire and Acquire + +func BenchmarkFastSemaphore_Mixed(b *testing.B) { + sem := NewFastSemaphore(10) + ctx := context.Background() + var wg sync.WaitGroup + + b.ResetTimer() + + // Half goroutines use TryAcquire + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N/2; i++ { + if sem.TryAcquire() { + sem.Release() + } + } + }() + + // Half goroutines use Acquire + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N/2; i++ { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }() + + wg.Wait() +} + +func BenchmarkChannelSemaphore_Mixed(b *testing.B) { + sem := newChannelSemaphore(10) + ctx := context.Background() + var wg sync.WaitGroup + + b.ResetTimer() + + // Half goroutines use TryAcquire + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N/2; i++ { + if sem.TryAcquire() { + sem.Release() + } + } + }() + + // Half goroutines use Acquire + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N/2; i++ { + sem.Acquire(ctx, time.Second) + sem.Release() + } + }() + + wg.Wait() +} + diff --git a/string_commands.go b/string_commands.go index 89c3bec4..551e4e0b 100644 --- a/string_commands.go +++ b/string_commands.go @@ -2,6 +2,7 @@ package redis import ( "context" + "fmt" "time" ) @@ -9,6 +10,8 @@ type StringCmdable interface { Append(ctx context.Context, key, value string) *IntCmd Decr(ctx context.Context, key string) *IntCmd DecrBy(ctx context.Context, key string, decrement int64) *IntCmd + DelExArgs(ctx context.Context, key string, a DelExArgs) *IntCmd + Digest(ctx context.Context, key string) *DigestCmd Get(ctx context.Context, key string) *StringCmd GetRange(ctx context.Context, key string, start, end int64) *StringCmd GetSet(ctx context.Context, key string, value interface{}) *StringCmd @@ -25,6 +28,14 @@ type StringCmdable interface { Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd SetArgs(ctx context.Context, key string, value interface{}, a SetArgs) *StatusCmd SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd + SetIFEQ(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StatusCmd + SetIFEQGet(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StringCmd + SetIFNE(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StatusCmd + SetIFNEGet(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StringCmd + SetIFDEQ(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StatusCmd + SetIFDEQGet(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StringCmd + SetIFDNE(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StatusCmd + SetIFDNEGet(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StringCmd SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd SetXX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd SetRange(ctx context.Context, key string, offset int64, value string) *IntCmd @@ -49,6 +60,70 @@ func (c cmdable) DecrBy(ctx context.Context, key string, decrement int64) *IntCm return cmd } +// DelExArgs provides arguments for the DelExArgs function. +type DelExArgs struct { + // Mode can be `IFEQ`, `IFNE`, `IFDEQ`, or `IFDNE`. + Mode string + + // MatchValue is used with IFEQ/IFNE modes for compare-and-delete operations. + // - IFEQ: only delete if current value equals MatchValue + // - IFNE: only delete if current value does not equal MatchValue + MatchValue interface{} + + // MatchDigest is used with IFDEQ/IFDNE modes for digest-based compare-and-delete. + // - IFDEQ: only delete if current value's digest equals MatchDigest + // - IFDNE: only delete if current value's digest does not equal MatchDigest + // + // The digest is a uint64 xxh3 hash value. + // + // For examples of client-side digest generation, see: + // example/digest-optimistic-locking/ + MatchDigest uint64 +} + +// DelExArgs Redis `DELEX key [IFEQ|IFNE|IFDEQ|IFDNE] match-value` command. +// Compare-and-delete with flexible conditions. +// +// Returns the number of keys that were removed (0 or 1). +func (c cmdable) DelExArgs(ctx context.Context, key string, a DelExArgs) *IntCmd { + args := []interface{}{"delex", key} + + if a.Mode != "" { + args = append(args, a.Mode) + + // Add match value/digest based on mode + switch a.Mode { + case "ifeq", "IFEQ", "ifne", "IFNE": + if a.MatchValue != nil { + args = append(args, a.MatchValue) + } + case "ifdeq", "IFDEQ", "ifdne", "IFDNE": + if a.MatchDigest != 0 { + args = append(args, fmt.Sprintf("%016x", a.MatchDigest)) + } + } + } + + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// Digest returns the xxh3 hash (uint64) of the specified key's value. +// +// The digest is a 64-bit xxh3 hash that can be used for optimistic locking +// with SetIFDEQ, SetIFDNE, and DelExArgs commands. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/digest/ +func (c cmdable) Digest(ctx context.Context, key string) *DigestCmd { + cmd := NewDigestCmd(ctx, "digest", key) + _ = c(ctx, cmd) + return cmd +} + // Get Redis `GET key` command. It returns redis.Nil error when key does not exist. func (c cmdable) Get(ctx context.Context, key string) *StringCmd { cmd := newStringCmd2(ctx, "get", key) @@ -266,9 +341,24 @@ func (c cmdable) Set(ctx context.Context, key string, value interface{}, expirat // SetArgs provides arguments for the SetArgs function. type SetArgs struct { - // Mode can be `NX` or `XX` or empty. + // Mode can be `NX`, `XX`, `IFEQ`, `IFNE`, `IFDEQ`, `IFDNE` or empty. Mode string + // MatchValue is used with IFEQ/IFNE modes for compare-and-set operations. + // - IFEQ: only set if current value equals MatchValue + // - IFNE: only set if current value does not equal MatchValue + MatchValue interface{} + + // MatchDigest is used with IFDEQ/IFDNE modes for digest-based compare-and-set. + // - IFDEQ: only set if current value's digest equals MatchDigest + // - IFDNE: only set if current value's digest does not equal MatchDigest + // + // The digest is a uint64 xxh3 hash value. + // + // For examples of client-side digest generation, see: + // example/digest-optimistic-locking/ + MatchDigest uint64 + // Zero `TTL` or `Expiration` means that the key has no expiration time. TTL time.Duration ExpireAt time.Time @@ -304,6 +394,18 @@ func (c cmdable) SetArgs(ctx context.Context, key string, value interface{}, a S if a.Mode != "" { args = append(args, a.Mode) + + // Add match value/digest for CAS modes + switch a.Mode { + case "ifeq", "IFEQ", "ifne", "IFNE": + if a.MatchValue != nil { + args = append(args, a.MatchValue) + } + case "ifdeq", "IFDEQ", "ifdne", "IFDNE": + if a.MatchDigest != 0 { + args = append(args, fmt.Sprintf("%016x", a.MatchDigest)) + } + } } if a.Get { @@ -371,6 +473,246 @@ func (c cmdable) SetXX(ctx context.Context, key string, value interface{}, expir return cmd } +// SetIFEQ Redis `SET key value [expiration] IFEQ match-value` command. +// Compare-and-set: only sets the value if the current value equals matchValue. +// +// Returns "OK" on success. +// Returns nil if the operation was aborted due to condition not matching. +// Zero expiration means the key has no expiration time. +func (c cmdable) SetIFEQ(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StatusCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifeq", matchValue) + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFEQGet Redis `SET key value [expiration] IFEQ match-value GET` command. +// Compare-and-set with GET: only sets the value if the current value equals matchValue, +// and returns the previous value. +// +// Returns the previous value on success. +// Returns nil if the operation was aborted due to condition not matching. +// Zero expiration means the key has no expiration time. +func (c cmdable) SetIFEQGet(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StringCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifeq", matchValue, "get") + + cmd := NewStringCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFNE Redis `SET key value [expiration] IFNE match-value` command. +// Compare-and-set: only sets the value if the current value does not equal matchValue. +// +// Returns "OK" on success. +// Returns nil if the operation was aborted due to condition not matching. +// Zero expiration means the key has no expiration time. +func (c cmdable) SetIFNE(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StatusCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifne", matchValue) + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFNEGet Redis `SET key value [expiration] IFNE match-value GET` command. +// Compare-and-set with GET: only sets the value if the current value does not equal matchValue, +// and returns the previous value. +// +// Returns the previous value on success. +// Returns nil if the operation was aborted due to condition not matching. +// Zero expiration means the key has no expiration time. +func (c cmdable) SetIFNEGet(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StringCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifne", matchValue, "get") + + cmd := NewStringCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFDEQ sets the value only if the current value's digest equals matchDigest. +// +// This is a compare-and-set operation using xxh3 digest for optimistic locking. +// The matchDigest parameter is a uint64 xxh3 hash value. +// +// Returns "OK" on success. +// Returns redis.Nil if the digest doesn't match (value was modified). +// Zero expiration means the key has no expiration time. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/set/ +func (c cmdable) SetIFDEQ(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StatusCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifdeq", fmt.Sprintf("%016x", matchDigest)) + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFDEQGet sets the value only if the current value's digest equals matchDigest, +// and returns the previous value. +// +// This is a compare-and-set operation using xxh3 digest for optimistic locking. +// The matchDigest parameter is a uint64 xxh3 hash value. +// +// Returns the previous value on success. +// Returns redis.Nil if the digest doesn't match (value was modified). +// Zero expiration means the key has no expiration time. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/set/ +func (c cmdable) SetIFDEQGet(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StringCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifdeq", fmt.Sprintf("%016x", matchDigest), "get") + + cmd := NewStringCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFDNE sets the value only if the current value's digest does NOT equal matchDigest. +// +// This is a compare-and-set operation using xxh3 digest for optimistic locking. +// The matchDigest parameter is a uint64 xxh3 hash value. +// +// Returns "OK" on success (digest didn't match, value was set). +// Returns redis.Nil if the digest matches (value was not modified). +// Zero expiration means the key has no expiration time. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/set/ +func (c cmdable) SetIFDNE(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StatusCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifdne", fmt.Sprintf("%016x", matchDigest)) + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFDNEGet sets the value only if the current value's digest does NOT equal matchDigest, +// and returns the previous value. +// +// This is a compare-and-set operation using xxh3 digest for optimistic locking. +// The matchDigest parameter is a uint64 xxh3 hash value. +// +// Returns the previous value on success (digest didn't match, value was set). +// Returns redis.Nil if the digest matches (value was not modified). +// Zero expiration means the key has no expiration time. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/set/ +func (c cmdable) SetIFDNEGet(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StringCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifdne", fmt.Sprintf("%016x", matchDigest), "get") + + cmd := NewStringCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) SetRange(ctx context.Context, key string, offset int64, value string) *IntCmd { cmd := NewIntCmd(ctx, "setrange", key, offset, value) _ = c(ctx, cmd)