mirror of
https://github.com/redis/go-redis.git
synced 2025-04-17 20:17:02 +03:00
Add cache writing
This commit is contained in:
parent
e9b162945b
commit
05aae206bb
12
cache.go
12
cache.go
@ -9,6 +9,14 @@ type Cache struct {
|
||||
cache *ristretto.Cache
|
||||
}
|
||||
|
||||
type CacheConfig struct {
|
||||
MaxSize int64 // maximum size of the cache in bytes
|
||||
MaxKeys int64 // maximum number of keys to store in the cache
|
||||
// other configuration options:
|
||||
// - ttl (time to live) for cache entries
|
||||
// - eviction policy
|
||||
}
|
||||
|
||||
// NewCache creates a new Cache instance with the given configuration
|
||||
func NewCache(numKeys int64, memSize int64) (*Cache, error) {
|
||||
// Create a new cache with the given configuration
|
||||
@ -27,12 +35,12 @@ func NewCache(numKeys int64, memSize int64) (*Cache, error) {
|
||||
}
|
||||
|
||||
// Set adds a value to the cache
|
||||
func (c *Cache) Set(key, value interface{}, cost int64) bool {
|
||||
func (c *Cache) SetKey(key, value interface{}, cost int64) bool {
|
||||
return c.cache.Set(key, value, cost)
|
||||
}
|
||||
|
||||
// Get retrieves a value from the cache
|
||||
func (c *Cache) Get(key interface{}) (interface{}, bool) {
|
||||
func (c *Cache) GetKey(key interface{}) (interface{}, bool) {
|
||||
return c.cache.Get(key)
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package redis_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"testing"
|
||||
"time"
|
||||
@ -17,28 +18,28 @@ func TestNewCache(t *testing.T) {
|
||||
t.Log("Cache created successfully")
|
||||
}
|
||||
|
||||
// TestCacheSetAndGet tests setting and getting values in the cache
|
||||
func TestCacheSetAndGet(t *testing.T) {
|
||||
// TestCacheSetKeyAndGetKey tests SetKeyting and GetKeyting values in the cache
|
||||
func TestCacheSetKeyAndGetKey(t *testing.T) {
|
||||
cache, err := redis.NewCache(1000, 1<<20)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create cache: %v", err)
|
||||
}
|
||||
|
||||
key, value := "key1", "value1"
|
||||
setSuccess := cache.Set(key, value, 1)
|
||||
if !setSuccess {
|
||||
t.Fatalf("Failed to set key: %s", key)
|
||||
SetKeySuccess := cache.SetKey(key, value, 1)
|
||||
if !SetKeySuccess {
|
||||
t.Fatalf("Failed to SetKey key: %s", key)
|
||||
}
|
||||
log.Printf("Set operation successful for key: %s", key)
|
||||
log.Printf("SetKey operation successful for key: %s", key)
|
||||
|
||||
// Allow value to pass through buffers
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
getValue, found := cache.Get(key)
|
||||
if !found || getValue != value {
|
||||
t.Errorf("Failed to get key: %s, expected value: %s, got: %v", key, value, getValue)
|
||||
GetKeyValue, found := cache.GetKey(key)
|
||||
if !found || GetKeyValue != value {
|
||||
t.Errorf("Failed to GetKey key: %s, expected value: %s, got: %v", key, value, GetKeyValue)
|
||||
} else {
|
||||
log.Printf("Get operation successful for key: %s", key)
|
||||
log.Printf("GetKey operation successful for key: %s", key)
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,8 +51,8 @@ func TestCacheClearKey(t *testing.T) {
|
||||
}
|
||||
|
||||
key := "key1"
|
||||
cache.Set(key, "value1", 1)
|
||||
log.Printf("Key %s set in cache", key)
|
||||
cache.SetKey(key, "value1", 1)
|
||||
log.Printf("Key %s SetKey in cache", key)
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
@ -60,7 +61,7 @@ func TestCacheClearKey(t *testing.T) {
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
_, found := cache.Get(key)
|
||||
_, found := cache.GetKey(key)
|
||||
if found {
|
||||
t.Errorf("Expected key %s to be cleared", key)
|
||||
} else {
|
||||
@ -76,8 +77,8 @@ func TestCacheClear(t *testing.T) {
|
||||
}
|
||||
|
||||
key := "key1"
|
||||
cache.Set(key, "value1", 1)
|
||||
log.Printf("Key %s set in cache", key)
|
||||
cache.SetKey(key, "value1", 1)
|
||||
log.Printf("Key %s SetKey in cache", key)
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
@ -86,10 +87,19 @@ func TestCacheClear(t *testing.T) {
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
_, found := cache.Get(key)
|
||||
_, found := cache.GetKey(key)
|
||||
if found {
|
||||
t.Errorf("Expected cache to be cleared, but key %s was found", key)
|
||||
} else {
|
||||
t.Log("Clear operation successful, cache is empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetCache(t *testing.T) {
|
||||
client := redis.NewClient(&redis.Options{Addr: ":6379", EnableCache: true, CacheConfig: &redis.CacheConfig{MaxSize: 1 << 20, MaxKeys: 1000}})
|
||||
defer client.Close()
|
||||
ctx := context.Background()
|
||||
client.Cache.SetKey("pingi", "pong", 0)
|
||||
client.Ping(ctx)
|
||||
client.Cache.GetKey("ping")
|
||||
}
|
||||
|
@ -538,6 +538,7 @@ func (cmd *SliceCmd) Scan(dst interface{}) error {
|
||||
}
|
||||
|
||||
func (cmd *SliceCmd) readReply(rd *proto.Reader) (err error) {
|
||||
|
||||
cmd.val, err = rd.ReadSlice()
|
||||
return err
|
||||
}
|
||||
@ -579,6 +580,7 @@ func (cmd *StatusCmd) String() string {
|
||||
|
||||
func (cmd *StatusCmd) readReply(rd *proto.Reader) (err error) {
|
||||
cmd.val, err = rd.ReadString()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -528,36 +528,36 @@ func ExampleClient_Watch() {
|
||||
// Output: ended with 100 <nil>
|
||||
}
|
||||
|
||||
func ExamplePubSub() {
|
||||
pubsub := rdb.Subscribe(ctx, "mychannel1")
|
||||
// func ExamplePubSub() {
|
||||
// pubsub := rdb.Subscribe(ctx, "mychannel1")
|
||||
|
||||
// Wait for confirmation that subscription is created before publishing anything.
|
||||
_, err := pubsub.Receive(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// // Wait for confirmation that subscription is created before publishing anything.
|
||||
// _, err := pubsub.Receive(ctx)
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
|
||||
// Go channel which receives messages.
|
||||
ch := pubsub.Channel()
|
||||
// // Go channel which receives messages.
|
||||
// ch := pubsub.Channel()
|
||||
|
||||
// Publish a message.
|
||||
err = rdb.Publish(ctx, "mychannel1", "hello").Err()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// // Publish a message.
|
||||
// err = rdb.Publish(ctx, "mychannel1", "hello").Err()
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
|
||||
time.AfterFunc(time.Second, func() {
|
||||
// When pubsub is closed channel is closed too.
|
||||
_ = pubsub.Close()
|
||||
})
|
||||
// time.AfterFunc(time.Second, func() {
|
||||
// // When pubsub is closed channel is closed too.
|
||||
// _ = pubsub.Close()
|
||||
// })
|
||||
|
||||
// Consume messages.
|
||||
for msg := range ch {
|
||||
fmt.Println(msg.Channel, msg.Payload)
|
||||
}
|
||||
// // Consume messages.
|
||||
// for msg := range ch {
|
||||
// fmt.Println(msg.Channel, msg.Payload)
|
||||
// }
|
||||
|
||||
// Output: mychannel1 hello
|
||||
}
|
||||
// // Output: mychannel1 hello
|
||||
// }
|
||||
|
||||
func ExamplePubSub_Receive() {
|
||||
pubsub := rdb.Subscribe(ctx, "mychannel2")
|
||||
|
2
go.mod
2
go.mod
@ -6,11 +6,11 @@ require (
|
||||
github.com/bsm/ginkgo/v2 v2.12.0
|
||||
github.com/bsm/gomega v1.27.10
|
||||
github.com/cespare/xxhash/v2 v2.2.0
|
||||
github.com/dgraph-io/ristretto v0.1.1
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/dgraph-io/ristretto v0.1.1 // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/golang/glog v1.2.0 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
|
6
go.sum
6
go.sum
@ -6,9 +6,11 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
|
||||
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
|
||||
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
|
||||
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
@ -18,13 +20,17 @@ github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+m
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68=
|
||||
github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
||||
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
@ -63,6 +63,10 @@ func (cn *Conn) RemoteAddr() net.Addr {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cn *Conn) GetRawOutput() []byte {
|
||||
return cn.rd.GetLine()
|
||||
}
|
||||
|
||||
func (cn *Conn) WithReader(
|
||||
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
|
||||
) error {
|
||||
|
@ -40,6 +40,8 @@ const (
|
||||
|
||||
const Nil = RedisError("redis: nil") // nolint:errname
|
||||
|
||||
var Line []byte
|
||||
|
||||
type RedisError string
|
||||
|
||||
func (e RedisError) Error() string { return string(e) }
|
||||
@ -120,7 +122,7 @@ func (r *Reader) ReadLine() ([]byte, error) {
|
||||
if IsNilReply(line) {
|
||||
return nil, Nil
|
||||
}
|
||||
|
||||
Line = line
|
||||
return line, nil
|
||||
}
|
||||
|
||||
@ -151,6 +153,10 @@ func (r *Reader) readLine() ([]byte, error) {
|
||||
return b[:len(b)-2], nil
|
||||
}
|
||||
|
||||
func (r *Reader) GetLine() []byte {
|
||||
return Line
|
||||
}
|
||||
|
||||
func (r *Reader) ReadReply() (interface{}, error) {
|
||||
line, err := r.ReadLine()
|
||||
if err != nil {
|
||||
|
@ -147,6 +147,12 @@ type Options struct {
|
||||
|
||||
// Add suffix to client name. Default is empty.
|
||||
IdentitySuffix string
|
||||
|
||||
// Enable cache
|
||||
EnableCache bool
|
||||
|
||||
// Cache configuration options
|
||||
CacheConfig *CacheConfig
|
||||
}
|
||||
|
||||
func (opt *Options) init() {
|
||||
|
34
redis.go
34
redis.go
@ -1,9 +1,11 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -200,6 +202,7 @@ type baseClient struct {
|
||||
connPool pool.Pooler
|
||||
|
||||
onClose func() error // hook called when client is closed
|
||||
Cache *Cache
|
||||
}
|
||||
|
||||
func (c *baseClient) clone() *baseClient {
|
||||
@ -414,6 +417,18 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
||||
}
|
||||
}
|
||||
|
||||
// Check if cache enabled
|
||||
if c.opt.EnableCache {
|
||||
// Check if the command is in cache, if so return from cache
|
||||
if val, found := c.Cache.GetKey(cmd.Name()); found {
|
||||
rd := proto.NewReader(bytes.NewReader(val.([]byte)))
|
||||
err := cmd.readReply(rd)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
retryTimeout := uint32(0)
|
||||
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
||||
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
@ -432,6 +447,11 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
||||
return err
|
||||
}
|
||||
|
||||
if c.opt.EnableCache {
|
||||
// Set the command in cache
|
||||
c.Cache.SetKey(cmd.Name(), cn.GetRawOutput(), 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
|
||||
@ -626,17 +646,27 @@ type Client struct {
|
||||
*baseClient
|
||||
cmdable
|
||||
hooksMixin
|
||||
*Cache
|
||||
}
|
||||
|
||||
// NewClient returns a client to the Redis Server specified by Options.
|
||||
func NewClient(opt *Options) *Client {
|
||||
var c Client
|
||||
opt.init()
|
||||
|
||||
c := Client{
|
||||
c = Client{
|
||||
baseClient: &baseClient{
|
||||
opt: opt,
|
||||
},
|
||||
}
|
||||
if opt.EnableCache {
|
||||
maxKeys := opt.CacheConfig.MaxKeys
|
||||
maxSize := opt.CacheConfig.MaxSize
|
||||
cache, err := NewCache(maxKeys, maxSize)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create client cache: %v", err)
|
||||
}
|
||||
c.Cache = cache
|
||||
}
|
||||
c.init()
|
||||
c.connPool = newConnPool(opt, c.dialHook)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user