diff --git a/command.go b/command.go index 637e9cdf..21abe11c 100644 --- a/command.go +++ b/command.go @@ -3993,6 +3993,231 @@ func (cmd *FunctionListCmd) readFunctions(rd *proto.Reader) ([]Function, error) return functions, nil } +// FunctionStats contains information about the scripts currently executing on the server, and the available engines +// - Engines: +// Statistics about the engine like number of functions and number of libraries +// - RunningScript: +// The script currently running on the shard we're connecting to. +// For Redis Enterprise and Redis Cloud, this represents the +// function with the longest running time, across all the running functions, on all shards +// - RunningScripts +// All scripts currently running in a Redis Enterprise clustered database. +// Only available on Redis Enterprise +type FunctionStats struct { + Engines []Engine + isRunning bool + rs RunningScript + allrs []RunningScript +} + +func (fs *FunctionStats) Running() bool { + return fs.isRunning +} + +func (fs *FunctionStats) RunningScript() (RunningScript, bool) { + return fs.rs, fs.isRunning +} + +// AllRunningScripts returns all scripts currently running in a Redis Enterprise clustered database. +// Only available on Redis Enterprise +func (fs *FunctionStats) AllRunningScripts() []RunningScript { + return fs.allrs +} + +type RunningScript struct { + Name string + Command []string + Duration time.Duration +} + +type Engine struct { + Language string + LibrariesCount int64 + FunctionsCount int64 +} + +type FunctionStatsCmd struct { + baseCmd + val FunctionStats +} + +var _ Cmder = (*FunctionStatsCmd)(nil) + +func NewFunctionStatsCmd(ctx context.Context, args ...interface{}) *FunctionStatsCmd { + return &FunctionStatsCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *FunctionStatsCmd) SetVal(val FunctionStats) { + cmd.val = val +} + +func (cmd *FunctionStatsCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *FunctionStatsCmd) Val() FunctionStats { + return cmd.val +} + +func (cmd *FunctionStatsCmd) Result() (FunctionStats, error) { + return cmd.val, cmd.err +} + +func (cmd *FunctionStatsCmd) readReply(rd *proto.Reader) (err error) { + n, err := rd.ReadMapLen() + if err != nil { + return err + } + + var key string + var result FunctionStats + for f := 0; f < n; f++ { + key, err = rd.ReadString() + if err != nil { + return err + } + + switch key { + case "running_script": + result.rs, result.isRunning, err = cmd.readRunningScript(rd) + case "engines": + result.Engines, err = cmd.readEngines(rd) + case "all_running_scripts": // Redis Enterprise only + result.allrs, result.isRunning, err = cmd.readRunningScripts(rd) + default: + return fmt.Errorf("redis: function stats unexpected key %s", key) + } + + if err != nil { + return err + } + } + + cmd.val = result + return nil +} + +func (cmd *FunctionStatsCmd) readRunningScript(rd *proto.Reader) (RunningScript, bool, error) { + err := rd.ReadFixedMapLen(3) + if err != nil { + if err == Nil { + return RunningScript{}, false, nil + } + return RunningScript{}, false, err + } + + var runningScript RunningScript + for i := 0; i < 3; i++ { + key, err := rd.ReadString() + if err != nil { + return RunningScript{}, false, err + } + + switch key { + case "name": + runningScript.Name, err = rd.ReadString() + case "duration_ms": + runningScript.Duration, err = cmd.readDuration(rd) + case "command": + runningScript.Command, err = cmd.readCommand(rd) + default: + return RunningScript{}, false, fmt.Errorf("redis: function stats unexpected running_script key %s", key) + } + + if err != nil { + return RunningScript{}, false, err + } + } + + return runningScript, true, nil +} + +func (cmd *FunctionStatsCmd) readEngines(rd *proto.Reader) ([]Engine, error) { + n, err := rd.ReadMapLen() + if err != nil { + return nil, err + } + + engines := make([]Engine, 0, n) + for i := 0; i < n; i++ { + engine := Engine{} + engine.Language, err = rd.ReadString() + if err != nil { + return nil, err + } + + err = rd.ReadFixedMapLen(2) + if err != nil { + return nil, fmt.Errorf("redis: function stats unexpected %s engine map length", engine.Language) + } + + for i := 0; i < 2; i++ { + key, err := rd.ReadString() + switch key { + case "libraries_count": + engine.LibrariesCount, err = rd.ReadInt() + case "functions_count": + engine.FunctionsCount, err = rd.ReadInt() + } + if err != nil { + return nil, err + } + } + + engines = append(engines, engine) + } + return engines, nil +} + +func (cmd *FunctionStatsCmd) readDuration(rd *proto.Reader) (time.Duration, error) { + t, err := rd.ReadInt() + if err != nil { + return time.Duration(0), err + } + return time.Duration(t) * time.Millisecond, nil +} + +func (cmd *FunctionStatsCmd) readCommand(rd *proto.Reader) ([]string, error) { + + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + + command := make([]string, 0, n) + for i := 0; i < n; i++ { + x, err := rd.ReadString() + if err != nil { + return nil, err + } + command = append(command, x) + } + + return command, nil +} +func (cmd *FunctionStatsCmd) readRunningScripts(rd *proto.Reader) ([]RunningScript, bool, error) { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, false, err + } + + runningScripts := make([]RunningScript, 0, n) + for i := 0; i < n; i++ { + rs, _, err := cmd.readRunningScript(rd) + if err != nil { + return nil, false, err + } + runningScripts = append(runningScripts, rs) + } + + return runningScripts, len(runningScripts) > 0, nil +} + //------------------------------------------------------------------------------ // LCSQuery is a parameter used for the LCS command diff --git a/commands.go b/commands.go index b8eef9dd..4aa0c882 100644 --- a/commands.go +++ b/commands.go @@ -408,10 +408,14 @@ type Cmdable interface { FunctionLoadReplace(ctx context.Context, code string) *StringCmd FunctionDelete(ctx context.Context, libName string) *StringCmd FunctionFlush(ctx context.Context) *StringCmd + FunctionKill(ctx context.Context) *StringCmd FunctionFlushAsync(ctx context.Context) *StringCmd FunctionList(ctx context.Context, q FunctionListQuery) *FunctionListCmd FunctionDump(ctx context.Context) *StringCmd FunctionRestore(ctx context.Context, libDump string) *StringCmd + FunctionStats(ctx context.Context) *FunctionStatsCmd + FCall(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd + FCallRo(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd Publish(ctx context.Context, channel string, message interface{}) *IntCmd SPublish(ctx context.Context, channel string, message interface{}) *IntCmd @@ -3401,6 +3405,12 @@ func (c cmdable) FunctionFlush(ctx context.Context) *StringCmd { return cmd } +func (c cmdable) FunctionKill(ctx context.Context) *StringCmd { + cmd := NewStringCmd(ctx, "function", "kill") + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) FunctionFlushAsync(ctx context.Context) *StringCmd { cmd := NewStringCmd(ctx, "function", "flush", "async") _ = c(ctx, cmd) @@ -3434,6 +3444,44 @@ func (c cmdable) FunctionRestore(ctx context.Context, libDump string) *StringCmd return cmd } +func (c cmdable) FunctionStats(ctx context.Context) *FunctionStatsCmd { + cmd := NewFunctionStatsCmd(ctx, "function", "stats") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) FCall(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd { + cmdArgs := fcallArgs("fcall", function, keys, args...) + cmd := NewCmd(ctx, cmdArgs...) + if len(keys) > 0 { + cmd.SetFirstKeyPos(3) + } + _ = c(ctx, cmd) + return cmd +} +func (c cmdable) FCallRo(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd { + cmdArgs := fcallArgs("fcall_ro", function, keys, args...) + cmd := NewCmd(ctx, cmdArgs...) + if len(keys) > 0 { + cmd.SetFirstKeyPos(3) + } + _ = c(ctx, cmd) + return cmd +} + +func fcallArgs(command string, function string, keys []string, args ...interface{}) []interface{} { + cmdArgs := make([]interface{}, 3+len(keys), 3+len(keys)+len(args)) + cmdArgs[0] = command + cmdArgs[1] = function + cmdArgs[2] = len(keys) + for i, key := range keys { + cmdArgs[3+i] = key + } + + cmdArgs = append(cmdArgs, args...) + return cmdArgs +} + //------------------------------------------------------------------------------ // Publish posts the message to the channel. diff --git a/commands_test.go b/commands_test.go index cb8f9452..a667bf19 100644 --- a/commands_test.go +++ b/commands_test.go @@ -332,6 +332,57 @@ var _ = Describe("Commands", func() { Expect(cmd.LastKeyPos).To(Equal(int8(0))) Expect(cmd.StepCount).To(Equal(int8(0))) }) + + It("should return all command names", func() { + cmdList := client.CommandList(ctx, nil) + Expect(cmdList.Err()).NotTo(HaveOccurred()) + cmdNames := cmdList.Val() + + Expect(cmdNames).NotTo(BeEmpty()) + + // Assert that some expected commands are present in the list + Expect(cmdNames).To(ContainElement("get")) + Expect(cmdNames).To(ContainElement("set")) + Expect(cmdNames).To(ContainElement("hset")) + }) + + It("should filter commands by module", func() { + filter := &redis.FilterBy{ + Module: "JSON", + } + cmdList := client.CommandList(ctx, filter) + Expect(cmdList.Err()).NotTo(HaveOccurred()) + Expect(cmdList.Val()).To(HaveLen(0)) + }) + + It("should filter commands by ACL category", func() { + + filter := &redis.FilterBy{ + ACLCat: "admin", + } + + cmdList := client.CommandList(ctx, filter) + Expect(cmdList.Err()).NotTo(HaveOccurred()) + cmdNames := cmdList.Val() + + // Assert that the returned list only contains commands from the admin ACL category + Expect(len(cmdNames)).To(BeNumerically(">", 10)) + }) + + It("should filter commands by pattern", func() { + filter := &redis.FilterBy{ + Pattern: "*GET*", + } + cmdList := client.CommandList(ctx, filter) + Expect(cmdList.Err()).NotTo(HaveOccurred()) + cmdNames := cmdList.Val() + + // Assert that the returned list only contains commands that match the given pattern + Expect(cmdNames).To(ContainElement("get")) + Expect(cmdNames).To(ContainElement("getbit")) + Expect(cmdNames).To(ContainElement("getrange")) + Expect(cmdNames).NotTo(ContainElement("set")) + }) }) Describe("debugging", func() { @@ -6242,14 +6293,22 @@ var _ = Describe("Commands", func() { { Name: "lib1_func1", Description: "This is the func-1 of lib 1", - Flags: []string{"no-writes", "allow-stale"}, + Flags: []string{"allow-oom", "allow-stale"}, }, }, Code: `#!lua name=%s - local function f1(keys, args) - return 'Function 1' - end + local function f1(keys, args) + local hash = keys[1] -- Get the key name + local time = redis.call('TIME')[1] -- Get the current time from the Redis server + + -- Add the current timestamp to the arguments that the user passed to the function, stored in args + table.insert(args, '_updated_at') + table.insert(args, time) + + -- Run HSET with the updated argument list + return redis.call('HSET', hash, unpack(args)) + end redis.register_function{ function_name='%s', @@ -6374,6 +6433,13 @@ var _ = Describe("Commands", func() { Expect(functionFlush.Err()).NotTo(HaveOccurred()) }) + It("Kills a running function", func() { + functionKill := client.FunctionKill(ctx) + Expect(functionKill.Err()).To(MatchError("NOTBUSY No scripts in execution right now.")) + + // Add test for a long-running function, once we make the test for `function stats` pass + }) + It("Lists registered functions", func() { err := client.FunctionLoad(ctx, lib1Code).Err() Expect(err).NotTo(HaveOccurred()) @@ -6413,57 +6479,6 @@ var _ = Describe("Commands", func() { Expect(err).To(Equal(redis.Nil)) }) - It("should return all command names", func() { - cmdList := client.CommandList(ctx, nil) - Expect(cmdList.Err()).NotTo(HaveOccurred()) - cmdNames := cmdList.Val() - - Expect(cmdNames).NotTo(BeEmpty()) - - // Assert that some expected commands are present in the list - Expect(cmdNames).To(ContainElement("get")) - Expect(cmdNames).To(ContainElement("set")) - Expect(cmdNames).To(ContainElement("hset")) - }) - - It("should filter commands by module", func() { - filter := &redis.FilterBy{ - Module: "JSON", - } - cmdList := client.CommandList(ctx, filter) - Expect(cmdList.Err()).NotTo(HaveOccurred()) - Expect(cmdList.Val()).To(HaveLen(0)) - }) - - It("should filter commands by ACL category", func() { - - filter := &redis.FilterBy{ - ACLCat: "admin", - } - - cmdList := client.CommandList(ctx, filter) - Expect(cmdList.Err()).NotTo(HaveOccurred()) - cmdNames := cmdList.Val() - - // Assert that the returned list only contains commands from the admin ACL category - Expect(len(cmdNames)).To(BeNumerically(">", 10)) - }) - - It("should filter commands by pattern", func() { - filter := &redis.FilterBy{ - Pattern: "*GET*", - } - cmdList := client.CommandList(ctx, filter) - Expect(cmdList.Err()).NotTo(HaveOccurred()) - cmdNames := cmdList.Val() - - // Assert that the returned list only contains commands that match the given pattern - Expect(cmdNames).To(ContainElement("get")) - Expect(cmdNames).To(ContainElement("getbit")) - Expect(cmdNames).To(ContainElement("getrange")) - Expect(cmdNames).NotTo(ContainElement("set")) - }) - It("Dump and restores all libraries", func() { err := client.FunctionLoad(ctx, lib1Code).Err() Expect(err).NotTo(HaveOccurred()) @@ -6492,6 +6507,109 @@ var _ = Describe("Commands", func() { Expect(list).To(HaveLen(2)) }) + It("Calls a function", func() { + lib1Code = fmt.Sprintf(lib1.Code, lib1.Name, lib1.Functions[0].Name, + lib1.Functions[0].Description, lib1.Functions[0].Flags[0], lib1.Functions[0].Flags[1]) + + err := client.FunctionLoad(ctx, lib1Code).Err() + Expect(err).NotTo(HaveOccurred()) + + x := client.FCall(ctx, lib1.Functions[0].Name, []string{"my_hash"}, "a", 1, "b", 2) + Expect(x.Err()).NotTo(HaveOccurred()) + Expect(x.Int()).To(Equal(3)) + }) + + It("Calls a function as read-only", func() { + lib1Code = fmt.Sprintf(lib1.Code, lib1.Name, lib1.Functions[0].Name, + lib1.Functions[0].Description, lib1.Functions[0].Flags[0], lib1.Functions[0].Flags[1]) + + err := client.FunctionLoad(ctx, lib1Code).Err() + Expect(err).NotTo(HaveOccurred()) + + // This function doesn't have a "no-writes" flag + x := client.FCallRo(ctx, lib1.Functions[0].Name, []string{"my_hash"}, "a", 1, "b", 2) + + Expect(x.Err()).To(HaveOccurred()) + + lib2Code = fmt.Sprintf(lib2.Code, lib2.Name, lib2.Functions[0].Name, lib2.Functions[1].Name, + lib2.Functions[1].Description, lib2.Functions[1].Flags[0]) + + // This function has a "no-writes" flag + err = client.FunctionLoad(ctx, lib2Code).Err() + Expect(err).NotTo(HaveOccurred()) + + x = client.FCallRo(ctx, lib2.Functions[1].Name, []string{}) + + Expect(x.Err()).NotTo(HaveOccurred()) + Expect(x.Text()).To(Equal("Function 2")) + }) + + It("Shows function stats", func() { + defer client.FunctionKill(ctx) + + // We can not run blocking commands in Redis functions, so we're using an infinite loop, + // but we're killing the function after calling FUNCTION STATS + lib := redis.Library{ + Name: "mylib1", + Engine: "LUA", + Functions: []redis.Function{ + { + Name: "lib1_func1", + Description: "This is the func-1 of lib 1", + Flags: []string{"no-writes"}, + }, + }, + Code: `#!lua name=%s + local function f1(keys, args) + local i = 0 + while true do + i = i + 1 + end + end + + redis.register_function{ + function_name='%s', + description ='%s', + callback=f1, + flags={'%s'} + }`, + } + libCode := fmt.Sprintf(lib.Code, lib.Name, lib.Functions[0].Name, + lib.Functions[0].Description, lib.Functions[0].Flags[0]) + err := client.FunctionLoad(ctx, libCode).Err() + + Expect(err).NotTo(HaveOccurred()) + + r, err := client.FunctionStats(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(r.Engines)).To(Equal(1)) + Expect(r.Running()).To(BeFalse()) + + started := make(chan bool) + go func() { + defer GinkgoRecover() + + client2 := redis.NewClient(redisOptions()) + + started <- true + _, err = client2.FCall(ctx, lib.Functions[0].Name, nil).Result() + Expect(err).To(HaveOccurred()) + }() + + <-started + time.Sleep(1 * time.Second) + r, err = client.FunctionStats(ctx).Result() + + Expect(err).NotTo(HaveOccurred()) + Expect(len(r.Engines)).To(Equal(1)) + rs, isRunning := r.RunningScript() + Expect(isRunning).To(BeTrue()) + Expect(rs.Name).To(Equal(lib.Functions[0].Name)) + Expect(rs.Duration > 0).To(BeTrue()) + + close(started) + }) + }) Describe("SlowLogGet", func() { @@ -6512,6 +6630,7 @@ var _ = Describe("Commands", func() { Expect(len(result)).NotTo(BeZero()) }) }) + }) type numberStruct struct {