diff --git a/command.go b/command.go index c641a3fa..ea1bd927 100644 --- a/command.go +++ b/command.go @@ -8,6 +8,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/redis/go-redis/v9/internal" @@ -5381,3 +5382,85 @@ func (cmd *InfoCmd) Item(section, key string) string { return cmd.val[section][key] } } + +type MonitorStatus int + +const ( + monitorStatusIdle MonitorStatus = iota + monitorStatusStart + monitorStatusStop +) + +type MonitorCmd struct { + baseCmd + ch chan string + status MonitorStatus + mu sync.Mutex +} + +func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd { + return &MonitorCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: []interface{}{"monitor"}, + }, + ch: ch, + status: monitorStatusIdle, + mu: sync.Mutex{}, + } +} + +func (cmd *MonitorCmd) String() string { + return cmdString(cmd, nil) +} + +func (cmd *MonitorCmd) readReply(rd *proto.Reader) error { + ctx, cancel := context.WithCancel(cmd.ctx) + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + err := cmd.readMonitor(rd, cancel) + if err != nil { + cmd.err = err + return + } + } + } + }(ctx) + return nil +} + +func (cmd *MonitorCmd) readMonitor(rd *proto.Reader, cancel context.CancelFunc) error { + for { + cmd.mu.Lock() + st := cmd.status + cmd.mu.Unlock() + if pk, _ := rd.Peek(1); len(pk) != 0 && st == monitorStatusStart { + line, err := rd.ReadString() + if err != nil { + return err + } + cmd.ch <- line + } + if st == monitorStatusStop { + cancel() + break + } + } + return nil +} + +func (cmd *MonitorCmd) Start() { + cmd.mu.Lock() + defer cmd.mu.Unlock() + cmd.status = monitorStatusStart +} + +func (cmd *MonitorCmd) Stop() { + cmd.mu.Lock() + defer cmd.mu.Unlock() + cmd.status = monitorStatusStop +} diff --git a/commands.go b/commands.go index 842cc23a..546ebafb 100644 --- a/commands.go +++ b/commands.go @@ -204,7 +204,6 @@ type Cmdable interface { SlowLogGet(ctx context.Context, num int64) *SlowLogCmd Time(ctx context.Context) *TimeCmd DebugObject(ctx context.Context, key string) *StringCmd - MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd @@ -700,3 +699,20 @@ func (c cmdable) ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *St _ = c(ctx, cmd) return cmd } + +/* +Monitor - represents a Redis MONITOR command, allowing the user to capture +and process all commands sent to a Redis server. This mimics the behavior of +MONITOR in the redis-cli. + +Notes: +- Using MONITOR blocks the connection to the server for itself. It needs a dedicated connection +- The user should create a channel of type string +- This runs concurrently in the background. Trigger via the Start and Stop functions +See further: Redis MONITOR command: https://redis.io/commands/monitor +*/ +func (c cmdable) Monitor(ctx context.Context, ch chan string) *MonitorCmd { + cmd := newMonitorCmd(ctx, ch) + _ = c(ctx, cmd) + return cmd +} diff --git a/main_test.go b/main_test.go index 6aaaf1c0..1a1660c9 100644 --- a/main_test.go +++ b/main_test.go @@ -41,6 +41,11 @@ var ( redisAddr = ":" + redisPort ) +var ( + rediStackPort = "6379" + rediStackAddr = ":" + rediStackPort +) + var ( sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3} diff --git a/monitor_test.go b/monitor_test.go new file mode 100644 index 00000000..1bc82eca --- /dev/null +++ b/monitor_test.go @@ -0,0 +1,48 @@ +package redis_test + +import ( + "context" + "time" + + . "github.com/bsm/ginkgo/v2" + . "github.com/bsm/gomega" + + "github.com/redis/go-redis/v9" +) + +var _ = Describe("Monitor command", Label("monitor"), func() { + ctx := context.TODO() + var client *redis.Client + + BeforeEach(func() { + client = redis.NewClient(&redis.Options{Addr: ":6379"}) + Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("should monitor", Label("monitor"), func() { + ress := make(chan string) + client1 := redis.NewClient(&redis.Options{Addr: rediStackAddr}) + mn := client1.Monitor(ctx, ress) + mn.Start() + // Wait for the Redis server to be in monitoring mode. + time.Sleep(100 * time.Millisecond) + client.Set(ctx, "foo", "bar", 0) + client.Set(ctx, "bar", "baz", 0) + client.Set(ctx, "bap", 8, 0) + client.Get(ctx, "bap") + lst := []string{} + for i := 0; i < 5; i++ { + s := <-ress + lst = append(lst, s) + } + mn.Stop() + Expect(lst[0]).To(ContainSubstring("OK")) + Expect(lst[1]).To(ContainSubstring(`"set" "foo" "bar"`)) + Expect(lst[2]).To(ContainSubstring(`"set" "bar" "baz"`)) + Expect(lst[3]).To(ContainSubstring(`"set" "bap" "8"`)) + }) +})