mirror of
				https://github.com/redis/go-redis.git
				synced 2025-11-04 02:33:24 +03:00 
			
		
		
		
	Merge pull request #431 from go-redis/fix/instrum-example
Fix WrapProcess for Ring and Cluster. Add better example.
This commit is contained in:
		
							
								
								
									
										17
									
								
								commands.go
									
									
									
									
									
								
							
							
						
						
									
										17
									
								
								commands.go
									
									
									
									
									
								
							@@ -185,7 +185,6 @@ type Cmdable interface {
 | 
			
		||||
	ClientKill(ipPort string) *StatusCmd
 | 
			
		||||
	ClientList() *StringCmd
 | 
			
		||||
	ClientPause(dur time.Duration) *BoolCmd
 | 
			
		||||
	ClientSetName(name string) *BoolCmd
 | 
			
		||||
	ConfigGet(parameter string) *SliceCmd
 | 
			
		||||
	ConfigResetStat() *StatusCmd
 | 
			
		||||
	ConfigSet(parameter, value string) *StatusCmd
 | 
			
		||||
@@ -241,14 +240,6 @@ type cmdable struct {
 | 
			
		||||
	process func(cmd Cmder) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WrapProcess replaces the process func. It takes a function createWrapper
 | 
			
		||||
// which is supplied by the user. createWrapper takes the old process func as
 | 
			
		||||
// an input and returns the new wrapper process func. createWrapper should
 | 
			
		||||
// use call the old process func within the new process func.
 | 
			
		||||
func (c *cmdable) WrapProcess(createWrapper func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
 | 
			
		||||
	c.process = createWrapper(c.process)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type statefulCmdable struct {
 | 
			
		||||
	process func(cmd Cmder) error
 | 
			
		||||
}
 | 
			
		||||
@@ -1625,15 +1616,15 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd {
 | 
			
		||||
	return cmd
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ClientSetName assigns a name to the one of many connections in the pool.
 | 
			
		||||
func (c *cmdable) ClientSetName(name string) *BoolCmd {
 | 
			
		||||
// ClientSetName assigns a name to the connection.
 | 
			
		||||
func (c *statefulCmdable) ClientSetName(name string) *BoolCmd {
 | 
			
		||||
	cmd := NewBoolCmd("client", "setname", name)
 | 
			
		||||
	c.process(cmd)
 | 
			
		||||
	return cmd
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ClientGetName returns the name of the one of many connections in the pool.
 | 
			
		||||
func (c *Client) ClientGetName() *StringCmd {
 | 
			
		||||
// ClientGetName returns the name of the connection.
 | 
			
		||||
func (c *statefulCmdable) ClientGetName() *StringCmd {
 | 
			
		||||
	cmd := NewStringCmd("client", "getname")
 | 
			
		||||
	c.process(cmd)
 | 
			
		||||
	return cmd
 | 
			
		||||
 
 | 
			
		||||
@@ -26,14 +26,20 @@ var _ = Describe("Commands", func() {
 | 
			
		||||
 | 
			
		||||
	Describe("server", func() {
 | 
			
		||||
 | 
			
		||||
		// It("should Auth", func() {
 | 
			
		||||
		// 	auth := client.Auth("password")
 | 
			
		||||
		// 	Expect(auth.Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
 | 
			
		||||
		// 	Expect(auth.Val()).To(Equal(""))
 | 
			
		||||
		// })
 | 
			
		||||
		It("should Auth", func() {
 | 
			
		||||
			_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
 | 
			
		||||
				pipe.Auth("password")
 | 
			
		||||
				return nil
 | 
			
		||||
			})
 | 
			
		||||
			Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set"))
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
		It("should Echo", func() {
 | 
			
		||||
			echo := client.Echo("hello")
 | 
			
		||||
			pipe := client.Pipeline()
 | 
			
		||||
			echo := pipe.Echo("hello")
 | 
			
		||||
			_, err := pipe.Exec()
 | 
			
		||||
			Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
			Expect(echo.Err()).NotTo(HaveOccurred())
 | 
			
		||||
			Expect(echo.Val()).To(Equal("hello"))
 | 
			
		||||
		})
 | 
			
		||||
@@ -44,11 +50,15 @@ var _ = Describe("Commands", func() {
 | 
			
		||||
			Expect(ping.Val()).To(Equal("PONG"))
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
		// It("should Select", func() {
 | 
			
		||||
		// 	sel := client.Select(1)
 | 
			
		||||
		// 	Expect(sel.Err()).NotTo(HaveOccurred())
 | 
			
		||||
		// 	Expect(sel.Val()).To(Equal("OK"))
 | 
			
		||||
		// })
 | 
			
		||||
		It("should Select", func() {
 | 
			
		||||
			pipe := client.Pipeline()
 | 
			
		||||
			sel := pipe.Select(1)
 | 
			
		||||
			_, err := pipe.Exec()
 | 
			
		||||
			Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
			Expect(sel.Err()).NotTo(HaveOccurred())
 | 
			
		||||
			Expect(sel.Val()).To(Equal("OK"))
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
		It("should BgRewriteAOF", func() {
 | 
			
		||||
			Skip("flaky test")
 | 
			
		||||
@@ -84,13 +94,18 @@ var _ = Describe("Commands", func() {
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
		It("should ClientSetName and ClientGetName", func() {
 | 
			
		||||
			isSet, err := client.ClientSetName("theclientname").Result()
 | 
			
		||||
			pipe := client.Pipeline()
 | 
			
		||||
			set := pipe.ClientSetName("theclientname")
 | 
			
		||||
			get := pipe.ClientGetName()
 | 
			
		||||
			_, err := pipe.Exec()
 | 
			
		||||
			Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
			Expect(isSet).To(BeTrue())
 | 
			
		||||
 | 
			
		||||
			val, err := client.ClientGetName().Result()
 | 
			
		||||
			Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
			Expect(val).To(Equal("theclientname"))
 | 
			
		||||
			Expect(set.Err()).NotTo(HaveOccurred())
 | 
			
		||||
			Expect(set.Val()).To(BeTrue())
 | 
			
		||||
 | 
			
		||||
			Expect(get.Err()).NotTo(HaveOccurred())
 | 
			
		||||
			Expect(get.Val()).To(Equal("theclientname"))
 | 
			
		||||
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
		It("should ConfigGet", func() {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										59
									
								
								example_instrumentation_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								example_instrumentation_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,59 @@
 | 
			
		||||
package redis_test
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	redis "gopkg.in/redis.v5"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Example_instrumentation() {
 | 
			
		||||
	ring := redis.NewRing(&redis.RingOptions{
 | 
			
		||||
		Addrs: map[string]string{
 | 
			
		||||
			"shard1": ":6379",
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
	ring.ForEachShard(func(client *redis.Client) error {
 | 
			
		||||
		wrapRedisProcess(client)
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		ring.Ping()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func wrapRedisProcess(client *redis.Client) {
 | 
			
		||||
	const precision = time.Microsecond
 | 
			
		||||
	var count, avgDur uint32
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		for _ = range time.Tick(3 * time.Second) {
 | 
			
		||||
			n := atomic.LoadUint32(&count)
 | 
			
		||||
			dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision
 | 
			
		||||
			fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error {
 | 
			
		||||
		return func(cmd redis.Cmder) error {
 | 
			
		||||
			start := time.Now()
 | 
			
		||||
			err := oldProcess(cmd)
 | 
			
		||||
			dur := time.Since(start)
 | 
			
		||||
 | 
			
		||||
			const decay = float64(1) / 100
 | 
			
		||||
			ms := float64(dur / precision)
 | 
			
		||||
			for {
 | 
			
		||||
				avg := atomic.LoadUint32(&avgDur)
 | 
			
		||||
				newAvg := uint32((1-decay)*float64(avg) + decay*ms)
 | 
			
		||||
				if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) {
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			atomic.AddUint32(&count, 1)
 | 
			
		||||
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
@@ -331,21 +331,3 @@ func ExampleScanCmd_Iterator() {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ExampleClient_instrumentation() {
 | 
			
		||||
	client := redis.NewClient(&redis.Options{
 | 
			
		||||
		Addr: ":6379",
 | 
			
		||||
	})
 | 
			
		||||
	client.WrapProcess(func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
 | 
			
		||||
		return func(cmd redis.Cmder) error {
 | 
			
		||||
			start := time.Now()
 | 
			
		||||
			err := oldProcess(cmd)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				fmt.Printf("command %s failed: %s", cmd, err)
 | 
			
		||||
			} else {
 | 
			
		||||
				fmt.Printf("command %q took %s", cmd, time.Since(start))
 | 
			
		||||
			}
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -152,7 +152,7 @@ var _ = Describe("Pipelining", func() {
 | 
			
		||||
		const N = 1000
 | 
			
		||||
 | 
			
		||||
		pipeline := client.Pipeline()
 | 
			
		||||
		wg := &sync.WaitGroup{}
 | 
			
		||||
		var wg sync.WaitGroup
 | 
			
		||||
		wg.Add(N)
 | 
			
		||||
		for i := 0; i < N; i++ {
 | 
			
		||||
			go func() {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										16
									
								
								redis.go
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								redis.go
									
									
									
									
									
								
							@@ -19,6 +19,7 @@ type baseClient struct {
 | 
			
		||||
	connPool pool.Pooler
 | 
			
		||||
	opt      *Options
 | 
			
		||||
 | 
			
		||||
	process func(Cmder) error
 | 
			
		||||
	onClose func() error // hook called when client is closed
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -78,6 +79,21 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *baseClient) Process(cmd Cmder) error {
 | 
			
		||||
	if c.process != nil {
 | 
			
		||||
		return c.process(cmd)
 | 
			
		||||
	}
 | 
			
		||||
	return c.defaultProcess(cmd)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WrapProcess replaces the process func. It takes a function createWrapper
 | 
			
		||||
// which is supplied by the user. createWrapper takes the old process func as
 | 
			
		||||
// an input and returns the new wrapper process func. createWrapper should
 | 
			
		||||
// use call the old process func within the new process func.
 | 
			
		||||
func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
 | 
			
		||||
	c.process = fn(c.defaultProcess)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *baseClient) defaultProcess(cmd Cmder) error {
 | 
			
		||||
	for i := 0; i <= c.opt.MaxRetries; i++ {
 | 
			
		||||
		if i > 0 {
 | 
			
		||||
			cmd.reset()
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										26
									
								
								ring.go
									
									
									
									
									
								
							
							
						
						
									
										26
									
								
								ring.go
									
									
									
									
									
								
							@@ -230,7 +230,7 @@ func (c *Ring) addClient(name string, cl *Client) {
 | 
			
		||||
	c.mu.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Ring) shardByKey(key string) (*Client, error) {
 | 
			
		||||
func (c *Ring) shardByKey(key string) (*ringShard, error) {
 | 
			
		||||
	key = hashtag.Key(key)
 | 
			
		||||
 | 
			
		||||
	c.mu.RLock()
 | 
			
		||||
@@ -246,27 +246,27 @@ func (c *Ring) shardByKey(key string) (*Client, error) {
 | 
			
		||||
		return nil, errRingShardsDown
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cl := c.shards[name].Client
 | 
			
		||||
	shard := c.shards[name]
 | 
			
		||||
	c.mu.RUnlock()
 | 
			
		||||
	return cl, nil
 | 
			
		||||
	return shard, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Ring) randomShard() (*Client, error) {
 | 
			
		||||
func (c *Ring) randomShard() (*ringShard, error) {
 | 
			
		||||
	return c.shardByKey(strconv.Itoa(rand.Int()))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Ring) shardByName(name string) (*Client, error) {
 | 
			
		||||
func (c *Ring) shardByName(name string) (*ringShard, error) {
 | 
			
		||||
	if name == "" {
 | 
			
		||||
		return c.randomShard()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	c.mu.RLock()
 | 
			
		||||
	cl := c.shards[name].Client
 | 
			
		||||
	shard := c.shards[name]
 | 
			
		||||
	c.mu.RUnlock()
 | 
			
		||||
	return cl, nil
 | 
			
		||||
	return shard, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Ring) cmdShard(cmd Cmder) (*Client, error) {
 | 
			
		||||
func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
 | 
			
		||||
	cmdInfo := c.cmdInfo(cmd.arg(0))
 | 
			
		||||
	firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
 | 
			
		||||
	if firstKey == "" {
 | 
			
		||||
@@ -276,12 +276,12 @@ func (c *Ring) cmdShard(cmd Cmder) (*Client, error) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Ring) Process(cmd Cmder) error {
 | 
			
		||||
	cl, err := c.cmdShard(cmd)
 | 
			
		||||
	shard, err := c.cmdShard(cmd)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		cmd.setErr(err)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return cl.baseClient.Process(cmd)
 | 
			
		||||
	return shard.Client.Process(cmd)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// rebalance removes dead shards from the Ring.
 | 
			
		||||
@@ -384,7 +384,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
 | 
			
		||||
				resetCmds(cmds)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			client, err := c.shardByName(name)
 | 
			
		||||
			shard, err := c.shardByName(name)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				setCmdsErr(cmds, err)
 | 
			
		||||
				if firstErr == nil {
 | 
			
		||||
@@ -393,7 +393,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			cn, _, err := client.conn()
 | 
			
		||||
			cn, _, err := shard.Client.conn()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				setCmdsErr(cmds, err)
 | 
			
		||||
				if firstErr == nil {
 | 
			
		||||
@@ -403,7 +403,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			retry, err := execCmds(cn, cmds)
 | 
			
		||||
			client.putConn(cn, err, false)
 | 
			
		||||
			shard.Client.putConn(cn, err, false)
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user