mirror of
https://github.com/redis/go-redis.git
synced 2025-04-17 20:17:02 +03:00
Add cache writing
This commit is contained in:
parent
05aae206bb
commit
bc7ec7304a
@ -96,10 +96,26 @@ func TestCacheClear(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSetCache(t *testing.T) {
|
||||
client := redis.NewClient(&redis.Options{Addr: ":6379", EnableCache: true, CacheConfig: &redis.CacheConfig{MaxSize: 1 << 20, MaxKeys: 1000}})
|
||||
cache, err := redis.NewCache(1000, 1<<20)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create cache: %v", err)
|
||||
}
|
||||
client := redis.NewClient(&redis.Options{Addr: ":6379", CacheObject: cache})
|
||||
defer client.Close()
|
||||
ctx := context.Background()
|
||||
client.Cache.SetKey("pingi", "pong", 0)
|
||||
client.Ping(ctx)
|
||||
client.Cache.GetKey("ping")
|
||||
// TODO: fix this
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
val, found := client.Options().CacheObject.GetKey("ping")
|
||||
if found {
|
||||
t.Log(val)
|
||||
} else {
|
||||
t.Error("Key not found")
|
||||
}
|
||||
ping := client.Ping(ctx)
|
||||
if ping.Val() == "PONG" {
|
||||
t.Log(ping.Val())
|
||||
} else {
|
||||
t.Error("Ping from cache failed")
|
||||
}
|
||||
}
|
||||
|
@ -64,7 +64,13 @@ func (cn *Conn) RemoteAddr() net.Addr {
|
||||
}
|
||||
|
||||
func (cn *Conn) GetRawOutput() []byte {
|
||||
return cn.rd.GetLine()
|
||||
line := cn.rd.GetLine()
|
||||
cn.rd.ResetLine()
|
||||
return line
|
||||
}
|
||||
|
||||
func (cn *Conn) ResetRawOutput() {
|
||||
cn.rd.ResetLine()
|
||||
}
|
||||
|
||||
func (cn *Conn) WithReader(
|
||||
|
@ -122,7 +122,6 @@ func (r *Reader) ReadLine() ([]byte, error) {
|
||||
if IsNilReply(line) {
|
||||
return nil, Nil
|
||||
}
|
||||
Line = line
|
||||
return line, nil
|
||||
}
|
||||
|
||||
@ -150,6 +149,7 @@ func (r *Reader) readLine() ([]byte, error) {
|
||||
if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' {
|
||||
return nil, fmt.Errorf("redis: invalid reply: %q", b)
|
||||
}
|
||||
Line = append(Line, b...)
|
||||
return b[:len(b)-2], nil
|
||||
}
|
||||
|
||||
@ -157,6 +157,10 @@ func (r *Reader) GetLine() []byte {
|
||||
return Line
|
||||
}
|
||||
|
||||
func (r *Reader) ResetLine() {
|
||||
Line = []byte{}
|
||||
}
|
||||
|
||||
func (r *Reader) ReadReply() (interface{}, error) {
|
||||
line, err := r.ReadLine()
|
||||
if err != nil {
|
||||
@ -230,7 +234,7 @@ func (r *Reader) readStringReply(line []byte) (string, error) {
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
Line = append(Line, b...)
|
||||
return util.BytesToString(b[:n]), nil
|
||||
}
|
||||
|
||||
|
@ -148,11 +148,8 @@ type Options struct {
|
||||
// Add suffix to client name. Default is empty.
|
||||
IdentitySuffix string
|
||||
|
||||
// Enable cache
|
||||
EnableCache bool
|
||||
|
||||
// Cache configuration options
|
||||
CacheConfig *CacheConfig
|
||||
// Enable cache for the client.
|
||||
CacheObject *Cache
|
||||
}
|
||||
|
||||
func (opt *Options) init() {
|
||||
|
45
redis.go
45
redis.go
@ -5,7 +5,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -202,7 +201,6 @@ type baseClient struct {
|
||||
connPool pool.Pooler
|
||||
|
||||
onClose func() error // hook called when client is closed
|
||||
Cache *Cache
|
||||
}
|
||||
|
||||
func (c *baseClient) clone() *baseClient {
|
||||
@ -397,9 +395,20 @@ func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn,
|
||||
|
||||
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
|
||||
var lastErr error
|
||||
// Check if cache enabled
|
||||
if c.opt.CacheObject != nil {
|
||||
// Check if the command is in cache, if so return from cache
|
||||
if val, found := c.opt.CacheObject.GetKey(cmd.Name()); found {
|
||||
rd := proto.NewReader(bytes.NewReader(val.([]byte)))
|
||||
err := cmd.readReply(rd)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
}
|
||||
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
|
||||
attempt := attempt
|
||||
|
||||
retry, err := c._process(ctx, cmd, attempt)
|
||||
if err == nil || !retry {
|
||||
return err
|
||||
@ -417,20 +426,9 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
||||
}
|
||||
}
|
||||
|
||||
// Check if cache enabled
|
||||
if c.opt.EnableCache {
|
||||
// Check if the command is in cache, if so return from cache
|
||||
if val, found := c.Cache.GetKey(cmd.Name()); found {
|
||||
rd := proto.NewReader(bytes.NewReader(val.([]byte)))
|
||||
err := cmd.readReply(rd)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
retryTimeout := uint32(0)
|
||||
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
||||
cn.ResetRawOutput()
|
||||
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
return writeCmd(wr, cmd)
|
||||
}); err != nil {
|
||||
@ -447,9 +445,9 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
||||
return err
|
||||
}
|
||||
|
||||
if c.opt.EnableCache {
|
||||
if c.opt.CacheObject != nil {
|
||||
// Set the command in cache
|
||||
c.Cache.SetKey(cmd.Name(), cn.GetRawOutput(), 0)
|
||||
c.opt.CacheObject.SetKey(cmd.Name(), cn.GetRawOutput(), 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -646,27 +644,16 @@ type Client struct {
|
||||
*baseClient
|
||||
cmdable
|
||||
hooksMixin
|
||||
*Cache
|
||||
}
|
||||
|
||||
// NewClient returns a client to the Redis Server specified by Options.
|
||||
func NewClient(opt *Options) *Client {
|
||||
var c Client
|
||||
opt.init()
|
||||
c = Client{
|
||||
c := Client{
|
||||
baseClient: &baseClient{
|
||||
opt: opt,
|
||||
},
|
||||
}
|
||||
if opt.EnableCache {
|
||||
maxKeys := opt.CacheConfig.MaxKeys
|
||||
maxSize := opt.CacheConfig.MaxSize
|
||||
cache, err := NewCache(maxKeys, maxSize)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create client cache: %v", err)
|
||||
}
|
||||
c.Cache = cache
|
||||
}
|
||||
c.init()
|
||||
c.connPool = newConnPool(opt, c.dialHook)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user