mirror of
https://github.com/redis/go-redis.git
synced 2025-04-17 20:17:02 +03:00
Support Monitor Command (#2830)
* Add monitor command * Add monitor commadn and tests * insure goroutine shutdown * fix data race * linting * change timeout explanation --------- Co-authored-by: Chayim <chayim@users.noreply.github.com>
This commit is contained in:
parent
631deaf25f
commit
277e8b7d9f
83
command.go
83
command.go
@ -8,6 +8,7 @@ import (
|
|||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9/internal"
|
"github.com/redis/go-redis/v9/internal"
|
||||||
@ -5381,3 +5382,85 @@ func (cmd *InfoCmd) Item(section, key string) string {
|
|||||||
return cmd.val[section][key]
|
return cmd.val[section][key]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MonitorStatus int
|
||||||
|
|
||||||
|
const (
|
||||||
|
monitorStatusIdle MonitorStatus = iota
|
||||||
|
monitorStatusStart
|
||||||
|
monitorStatusStop
|
||||||
|
)
|
||||||
|
|
||||||
|
type MonitorCmd struct {
|
||||||
|
baseCmd
|
||||||
|
ch chan string
|
||||||
|
status MonitorStatus
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd {
|
||||||
|
return &MonitorCmd{
|
||||||
|
baseCmd: baseCmd{
|
||||||
|
ctx: ctx,
|
||||||
|
args: []interface{}{"monitor"},
|
||||||
|
},
|
||||||
|
ch: ch,
|
||||||
|
status: monitorStatusIdle,
|
||||||
|
mu: sync.Mutex{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *MonitorCmd) String() string {
|
||||||
|
return cmdString(cmd, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *MonitorCmd) readReply(rd *proto.Reader) error {
|
||||||
|
ctx, cancel := context.WithCancel(cmd.ctx)
|
||||||
|
go func(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
err := cmd.readMonitor(rd, cancel)
|
||||||
|
if err != nil {
|
||||||
|
cmd.err = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(ctx)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *MonitorCmd) readMonitor(rd *proto.Reader, cancel context.CancelFunc) error {
|
||||||
|
for {
|
||||||
|
cmd.mu.Lock()
|
||||||
|
st := cmd.status
|
||||||
|
cmd.mu.Unlock()
|
||||||
|
if pk, _ := rd.Peek(1); len(pk) != 0 && st == monitorStatusStart {
|
||||||
|
line, err := rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cmd.ch <- line
|
||||||
|
}
|
||||||
|
if st == monitorStatusStop {
|
||||||
|
cancel()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *MonitorCmd) Start() {
|
||||||
|
cmd.mu.Lock()
|
||||||
|
defer cmd.mu.Unlock()
|
||||||
|
cmd.status = monitorStatusStart
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *MonitorCmd) Stop() {
|
||||||
|
cmd.mu.Lock()
|
||||||
|
defer cmd.mu.Unlock()
|
||||||
|
cmd.status = monitorStatusStop
|
||||||
|
}
|
||||||
|
18
commands.go
18
commands.go
@ -204,7 +204,6 @@ type Cmdable interface {
|
|||||||
SlowLogGet(ctx context.Context, num int64) *SlowLogCmd
|
SlowLogGet(ctx context.Context, num int64) *SlowLogCmd
|
||||||
Time(ctx context.Context) *TimeCmd
|
Time(ctx context.Context) *TimeCmd
|
||||||
DebugObject(ctx context.Context, key string) *StringCmd
|
DebugObject(ctx context.Context, key string) *StringCmd
|
||||||
|
|
||||||
MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd
|
MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd
|
||||||
|
|
||||||
ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd
|
ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd
|
||||||
@ -700,3 +699,20 @@ func (c cmdable) ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *St
|
|||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Monitor - represents a Redis MONITOR command, allowing the user to capture
|
||||||
|
and process all commands sent to a Redis server. This mimics the behavior of
|
||||||
|
MONITOR in the redis-cli.
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
- Using MONITOR blocks the connection to the server for itself. It needs a dedicated connection
|
||||||
|
- The user should create a channel of type string
|
||||||
|
- This runs concurrently in the background. Trigger via the Start and Stop functions
|
||||||
|
See further: Redis MONITOR command: https://redis.io/commands/monitor
|
||||||
|
*/
|
||||||
|
func (c cmdable) Monitor(ctx context.Context, ch chan string) *MonitorCmd {
|
||||||
|
cmd := newMonitorCmd(ctx, ch)
|
||||||
|
_ = c(ctx, cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
@ -41,6 +41,11 @@ var (
|
|||||||
redisAddr = ":" + redisPort
|
redisAddr = ":" + redisPort
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
rediStackPort = "6379"
|
||||||
|
rediStackAddr = ":" + rediStackPort
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}
|
sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}
|
||||||
|
|
||||||
|
48
monitor_test.go
Normal file
48
monitor_test.go
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
package redis_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
. "github.com/bsm/ginkgo/v2"
|
||||||
|
. "github.com/bsm/gomega"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Monitor command", Label("monitor"), func() {
|
||||||
|
ctx := context.TODO()
|
||||||
|
var client *redis.Client
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
client = redis.NewClient(&redis.Options{Addr: ":6379"})
|
||||||
|
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should monitor", Label("monitor"), func() {
|
||||||
|
ress := make(chan string)
|
||||||
|
client1 := redis.NewClient(&redis.Options{Addr: rediStackAddr})
|
||||||
|
mn := client1.Monitor(ctx, ress)
|
||||||
|
mn.Start()
|
||||||
|
// Wait for the Redis server to be in monitoring mode.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
client.Set(ctx, "foo", "bar", 0)
|
||||||
|
client.Set(ctx, "bar", "baz", 0)
|
||||||
|
client.Set(ctx, "bap", 8, 0)
|
||||||
|
client.Get(ctx, "bap")
|
||||||
|
lst := []string{}
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
s := <-ress
|
||||||
|
lst = append(lst, s)
|
||||||
|
}
|
||||||
|
mn.Stop()
|
||||||
|
Expect(lst[0]).To(ContainSubstring("OK"))
|
||||||
|
Expect(lst[1]).To(ContainSubstring(`"set" "foo" "bar"`))
|
||||||
|
Expect(lst[2]).To(ContainSubstring(`"set" "bar" "baz"`))
|
||||||
|
Expect(lst[3]).To(ContainSubstring(`"set" "bap" "8"`))
|
||||||
|
})
|
||||||
|
})
|
Loading…
x
Reference in New Issue
Block a user