mirror of
https://github.com/redis/go-redis.git
synced 2025-08-08 23:42:06 +03:00
feat(proto): add configurable buffer sizes for Redis connections (#3453)
* add configurable buffer sizes for Redis connections * add MiB to wordlist * Add description for buffer size parameter
This commit is contained in:
3
.github/wordlist.txt
vendored
3
.github/wordlist.txt
vendored
@@ -73,4 +73,5 @@ OAuth
|
||||
Azure
|
||||
StreamingCredentialsProvider
|
||||
oauth
|
||||
entraid
|
||||
entraid
|
||||
MiB
|
12
README.md
12
README.md
@@ -297,6 +297,18 @@ func main() {
|
||||
```
|
||||
|
||||
|
||||
### Buffer Size Configuration
|
||||
|
||||
go-redis uses 0.5MiB read and write buffers by default for optimal performance. For high-throughput applications or large pipelines, you can customize buffer sizes:
|
||||
|
||||
```go
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
ReadBufferSize: 1024 * 1024, // 1MiB read buffer
|
||||
WriteBufferSize: 1024 * 1024, // 1MiB write buffer
|
||||
})
|
||||
```
|
||||
|
||||
### Advanced Configuration
|
||||
|
||||
go-redis supports extending the client identification phase to allow projects to send their own custom client identification.
|
||||
|
183
internal/pool/buffer_size_test.go
Normal file
183
internal/pool/buffer_size_test.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package pool_test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"net"
|
||||
"unsafe"
|
||||
|
||||
. "github.com/bsm/ginkgo/v2"
|
||||
. "github.com/bsm/gomega"
|
||||
|
||||
"github.com/redis/go-redis/v9/internal/pool"
|
||||
"github.com/redis/go-redis/v9/internal/proto"
|
||||
)
|
||||
|
||||
var _ = Describe("Buffer Size Configuration", func() {
|
||||
var connPool *pool.ConnPool
|
||||
ctx := context.Background()
|
||||
|
||||
AfterEach(func() {
|
||||
if connPool != nil {
|
||||
connPool.Close()
|
||||
}
|
||||
})
|
||||
|
||||
It("should use default buffer sizes when not specified", func() {
|
||||
connPool = pool.NewConnPool(&pool.Options{
|
||||
Dialer: dummyDialer,
|
||||
PoolSize: 1,
|
||||
PoolTimeout: 1000,
|
||||
})
|
||||
|
||||
cn, err := connPool.NewConn(ctx)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer connPool.CloseConn(cn)
|
||||
|
||||
// Check that default buffer sizes are used (0.5MiB)
|
||||
writerBufSize := getWriterBufSizeUnsafe(cn)
|
||||
readerBufSize := getReaderBufSizeUnsafe(cn)
|
||||
|
||||
Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
|
||||
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
|
||||
})
|
||||
|
||||
It("should use custom buffer sizes when specified", func() {
|
||||
customReadSize := 32 * 1024 // 32KB
|
||||
customWriteSize := 64 * 1024 // 64KB
|
||||
|
||||
connPool = pool.NewConnPool(&pool.Options{
|
||||
Dialer: dummyDialer,
|
||||
PoolSize: 1,
|
||||
PoolTimeout: 1000,
|
||||
ReadBufferSize: customReadSize,
|
||||
WriteBufferSize: customWriteSize,
|
||||
})
|
||||
|
||||
cn, err := connPool.NewConn(ctx)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer connPool.CloseConn(cn)
|
||||
|
||||
// Check that custom buffer sizes are used
|
||||
writerBufSize := getWriterBufSizeUnsafe(cn)
|
||||
readerBufSize := getReaderBufSizeUnsafe(cn)
|
||||
|
||||
Expect(writerBufSize).To(Equal(customWriteSize))
|
||||
Expect(readerBufSize).To(Equal(customReadSize))
|
||||
})
|
||||
|
||||
It("should handle zero buffer sizes by using defaults", func() {
|
||||
connPool = pool.NewConnPool(&pool.Options{
|
||||
Dialer: dummyDialer,
|
||||
PoolSize: 1,
|
||||
PoolTimeout: 1000,
|
||||
ReadBufferSize: 0, // Should use default
|
||||
WriteBufferSize: 0, // Should use default
|
||||
})
|
||||
|
||||
cn, err := connPool.NewConn(ctx)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer connPool.CloseConn(cn)
|
||||
|
||||
// Check that default buffer sizes are used (0.5MiB)
|
||||
writerBufSize := getWriterBufSizeUnsafe(cn)
|
||||
readerBufSize := getReaderBufSizeUnsafe(cn)
|
||||
|
||||
Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
|
||||
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
|
||||
})
|
||||
|
||||
It("should use 0.5MiB default buffer sizes for standalone NewConn", func() {
|
||||
// Test that NewConn (without pool) also uses 0.5MiB defaults
|
||||
netConn := newDummyConn()
|
||||
cn := pool.NewConn(netConn)
|
||||
defer cn.Close()
|
||||
|
||||
writerBufSize := getWriterBufSizeUnsafe(cn)
|
||||
readerBufSize := getReaderBufSizeUnsafe(cn)
|
||||
|
||||
Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
|
||||
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
|
||||
})
|
||||
|
||||
It("should use 0.5MiB defaults even when pool is created directly without buffer sizes", func() {
|
||||
// Test the scenario where someone creates a pool directly (like in tests)
|
||||
// without setting ReadBufferSize and WriteBufferSize
|
||||
connPool = pool.NewConnPool(&pool.Options{
|
||||
Dialer: dummyDialer,
|
||||
PoolSize: 1,
|
||||
PoolTimeout: 1000,
|
||||
// ReadBufferSize and WriteBufferSize are not set (will be 0)
|
||||
})
|
||||
|
||||
cn, err := connPool.NewConn(ctx)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer connPool.CloseConn(cn)
|
||||
|
||||
// Should still get 0.5MiB defaults because NewConnPool sets them
|
||||
writerBufSize := getWriterBufSizeUnsafe(cn)
|
||||
readerBufSize := getReaderBufSizeUnsafe(cn)
|
||||
|
||||
Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
|
||||
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
|
||||
})
|
||||
})
|
||||
|
||||
// Helper functions to extract buffer sizes using unsafe pointers
|
||||
func getWriterBufSizeUnsafe(cn *pool.Conn) int {
|
||||
cnPtr := (*struct {
|
||||
usedAt int64
|
||||
netConn net.Conn
|
||||
rd *proto.Reader
|
||||
bw *bufio.Writer
|
||||
wr *proto.Writer
|
||||
// ... other fields
|
||||
})(unsafe.Pointer(cn))
|
||||
|
||||
if cnPtr.bw == nil {
|
||||
return -1
|
||||
}
|
||||
|
||||
bwPtr := (*struct {
|
||||
err error
|
||||
buf []byte
|
||||
n int
|
||||
wr interface{}
|
||||
})(unsafe.Pointer(cnPtr.bw))
|
||||
|
||||
return len(bwPtr.buf)
|
||||
}
|
||||
|
||||
func getReaderBufSizeUnsafe(cn *pool.Conn) int {
|
||||
cnPtr := (*struct {
|
||||
usedAt int64
|
||||
netConn net.Conn
|
||||
rd *proto.Reader
|
||||
bw *bufio.Writer
|
||||
wr *proto.Writer
|
||||
// ... other fields
|
||||
})(unsafe.Pointer(cn))
|
||||
|
||||
if cnPtr.rd == nil {
|
||||
return -1
|
||||
}
|
||||
|
||||
rdPtr := (*struct {
|
||||
rd *bufio.Reader
|
||||
})(unsafe.Pointer(cnPtr.rd))
|
||||
|
||||
if rdPtr.rd == nil {
|
||||
return -1
|
||||
}
|
||||
|
||||
bufReaderPtr := (*struct {
|
||||
buf []byte
|
||||
rd interface{}
|
||||
r, w int
|
||||
err error
|
||||
lastByte int
|
||||
lastRuneSize int
|
||||
})(unsafe.Pointer(rdPtr.rd))
|
||||
|
||||
return len(bufReaderPtr.buf)
|
||||
}
|
@@ -28,12 +28,28 @@ type Conn struct {
|
||||
}
|
||||
|
||||
func NewConn(netConn net.Conn) *Conn {
|
||||
return NewConnWithBufferSize(netConn, proto.DefaultBufferSize, proto.DefaultBufferSize)
|
||||
}
|
||||
|
||||
func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Conn {
|
||||
cn := &Conn{
|
||||
netConn: netConn,
|
||||
createdAt: time.Now(),
|
||||
}
|
||||
cn.rd = proto.NewReader(netConn)
|
||||
cn.bw = bufio.NewWriter(netConn)
|
||||
|
||||
// Use specified buffer sizes, or fall back to 0.5MiB defaults if 0
|
||||
if readBufSize > 0 {
|
||||
cn.rd = proto.NewReaderSize(netConn, readBufSize)
|
||||
} else {
|
||||
cn.rd = proto.NewReader(netConn) // Uses 0.5MiB default
|
||||
}
|
||||
|
||||
if writeBufSize > 0 {
|
||||
cn.bw = bufio.NewWriterSize(netConn, writeBufSize)
|
||||
} else {
|
||||
cn.bw = bufio.NewWriterSize(netConn, proto.DefaultBufferSize)
|
||||
}
|
||||
|
||||
cn.wr = proto.NewWriter(cn.bw)
|
||||
cn.SetUsedAt(time.Now())
|
||||
return cn
|
||||
|
@@ -71,6 +71,9 @@ type Options struct {
|
||||
MaxActiveConns int
|
||||
ConnMaxIdleTime time.Duration
|
||||
ConnMaxLifetime time.Duration
|
||||
|
||||
ReadBufferSize int
|
||||
WriteBufferSize int
|
||||
}
|
||||
|
||||
type lastDialErrorWrap struct {
|
||||
@@ -226,7 +229,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cn := NewConn(netConn)
|
||||
cn := NewConnWithBufferSize(netConn, p.cfg.ReadBufferSize, p.cfg.WriteBufferSize)
|
||||
cn.pooled = pooled
|
||||
return cn, nil
|
||||
}
|
||||
|
@@ -12,6 +12,9 @@ import (
|
||||
"github.com/redis/go-redis/v9/internal/util"
|
||||
)
|
||||
|
||||
// DefaultBufferSize is the default size for read/write buffers (0.5MiB)
|
||||
const DefaultBufferSize = 512 * 1024
|
||||
|
||||
// redis resp protocol data type.
|
||||
const (
|
||||
RespStatus = '+' // +<string>\r\n
|
||||
@@ -58,7 +61,13 @@ type Reader struct {
|
||||
|
||||
func NewReader(rd io.Reader) *Reader {
|
||||
return &Reader{
|
||||
rd: bufio.NewReader(rd),
|
||||
rd: bufio.NewReaderSize(rd, DefaultBufferSize),
|
||||
}
|
||||
}
|
||||
|
||||
func NewReaderSize(rd io.Reader, size int) *Reader {
|
||||
return &Reader{
|
||||
rd: bufio.NewReaderSize(rd, size),
|
||||
}
|
||||
}
|
||||
|
||||
|
23
options.go
23
options.go
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/redis/go-redis/v9/auth"
|
||||
"github.com/redis/go-redis/v9/internal/pool"
|
||||
"github.com/redis/go-redis/v9/internal/proto"
|
||||
)
|
||||
|
||||
// Limiter is the interface of a rate limiter or a circuit breaker.
|
||||
@@ -130,6 +131,20 @@ type Options struct {
|
||||
// See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts
|
||||
ContextTimeoutEnabled bool
|
||||
|
||||
// ReadBufferSize is the size of the bufio.Reader buffer for each connection.
|
||||
// Larger buffers can improve performance for commands that return large responses.
|
||||
// Smaller buffers can improve memory usage for larger pools.
|
||||
//
|
||||
// default: 0.5MiB (524288 bytes)
|
||||
ReadBufferSize int
|
||||
|
||||
// WriteBufferSize is the size of the bufio.Writer buffer for each connection.
|
||||
// Larger buffers can improve performance for large pipelines and commands with many arguments.
|
||||
// Smaller buffers can improve memory usage for larger pools.
|
||||
//
|
||||
// default: 0.5MiB (524288 bytes)
|
||||
WriteBufferSize int
|
||||
|
||||
// PoolFIFO type of connection pool.
|
||||
//
|
||||
// - true for FIFO pool
|
||||
@@ -241,6 +256,12 @@ func (opt *Options) init() {
|
||||
if opt.PoolSize == 0 {
|
||||
opt.PoolSize = 10 * runtime.GOMAXPROCS(0)
|
||||
}
|
||||
if opt.ReadBufferSize == 0 {
|
||||
opt.ReadBufferSize = proto.DefaultBufferSize
|
||||
}
|
||||
if opt.WriteBufferSize == 0 {
|
||||
opt.WriteBufferSize = proto.DefaultBufferSize
|
||||
}
|
||||
switch opt.ReadTimeout {
|
||||
case -2:
|
||||
opt.ReadTimeout = -1
|
||||
@@ -592,5 +613,7 @@ func newConnPool(
|
||||
MaxActiveConns: opt.MaxActiveConns,
|
||||
ConnMaxIdleTime: opt.ConnMaxIdleTime,
|
||||
ConnMaxLifetime: opt.ConnMaxLifetime,
|
||||
ReadBufferSize: opt.ReadBufferSize,
|
||||
WriteBufferSize: opt.WriteBufferSize,
|
||||
})
|
||||
}
|
||||
|
@@ -92,6 +92,20 @@ type ClusterOptions struct {
|
||||
ConnMaxIdleTime time.Duration
|
||||
ConnMaxLifetime time.Duration
|
||||
|
||||
// ReadBufferSize is the size of the bufio.Reader buffer for each connection.
|
||||
// Larger buffers can improve performance for commands that return large responses.
|
||||
// Smaller buffers can improve memory usage for larger pools.
|
||||
//
|
||||
// default: 0.5MiB (524288 bytes)
|
||||
ReadBufferSize int
|
||||
|
||||
// WriteBufferSize is the size of the bufio.Writer buffer for each connection.
|
||||
// Larger buffers can improve performance for large pipelines and commands with many arguments.
|
||||
// Smaller buffers can improve memory usage for larger pools.
|
||||
//
|
||||
// default: 0.5MiB (524288 bytes)
|
||||
WriteBufferSize int
|
||||
|
||||
TLSConfig *tls.Config
|
||||
|
||||
// DisableIndentity - Disable set-lib on connect.
|
||||
@@ -127,6 +141,12 @@ func (opt *ClusterOptions) init() {
|
||||
if opt.PoolSize == 0 {
|
||||
opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
|
||||
}
|
||||
if opt.ReadBufferSize == 0 {
|
||||
opt.ReadBufferSize = proto.DefaultBufferSize
|
||||
}
|
||||
if opt.WriteBufferSize == 0 {
|
||||
opt.WriteBufferSize = proto.DefaultBufferSize
|
||||
}
|
||||
|
||||
switch opt.ReadTimeout {
|
||||
case -1:
|
||||
@@ -318,6 +338,8 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
||||
MaxActiveConns: opt.MaxActiveConns,
|
||||
ConnMaxIdleTime: opt.ConnMaxIdleTime,
|
||||
ConnMaxLifetime: opt.ConnMaxLifetime,
|
||||
ReadBufferSize: opt.ReadBufferSize,
|
||||
WriteBufferSize: opt.WriteBufferSize,
|
||||
DisableIdentity: opt.DisableIdentity,
|
||||
DisableIndentity: opt.DisableIdentity,
|
||||
IdentitySuffix: opt.IdentitySuffix,
|
||||
|
24
ring.go
24
ring.go
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/redis/go-redis/v9/internal"
|
||||
"github.com/redis/go-redis/v9/internal/hashtag"
|
||||
"github.com/redis/go-redis/v9/internal/pool"
|
||||
"github.com/redis/go-redis/v9/internal/proto"
|
||||
"github.com/redis/go-redis/v9/internal/rand"
|
||||
)
|
||||
|
||||
@@ -113,6 +114,20 @@ type RingOptions struct {
|
||||
ConnMaxIdleTime time.Duration
|
||||
ConnMaxLifetime time.Duration
|
||||
|
||||
// ReadBufferSize is the size of the bufio.Reader buffer for each connection.
|
||||
// Larger buffers can improve performance for commands that return large responses.
|
||||
// Smaller buffers can improve memory usage for larger pools.
|
||||
//
|
||||
// default: 0.5MiB (524288 bytes)
|
||||
ReadBufferSize int
|
||||
|
||||
// WriteBufferSize is the size of the bufio.Writer buffer for each connection.
|
||||
// Larger buffers can improve performance for large pipelines and commands with many arguments.
|
||||
// Smaller buffers can improve memory usage for larger pools.
|
||||
//
|
||||
// default: 0.5MiB (524288 bytes)
|
||||
WriteBufferSize int
|
||||
|
||||
TLSConfig *tls.Config
|
||||
Limiter Limiter
|
||||
|
||||
@@ -164,6 +179,13 @@ func (opt *RingOptions) init() {
|
||||
case 0:
|
||||
opt.MaxRetryBackoff = 512 * time.Millisecond
|
||||
}
|
||||
|
||||
if opt.ReadBufferSize == 0 {
|
||||
opt.ReadBufferSize = proto.DefaultBufferSize
|
||||
}
|
||||
if opt.WriteBufferSize == 0 {
|
||||
opt.WriteBufferSize = proto.DefaultBufferSize
|
||||
}
|
||||
}
|
||||
|
||||
func (opt *RingOptions) clientOptions() *Options {
|
||||
@@ -195,6 +217,8 @@ func (opt *RingOptions) clientOptions() *Options {
|
||||
MaxActiveConns: opt.MaxActiveConns,
|
||||
ConnMaxIdleTime: opt.ConnMaxIdleTime,
|
||||
ConnMaxLifetime: opt.ConnMaxLifetime,
|
||||
ReadBufferSize: opt.ReadBufferSize,
|
||||
WriteBufferSize: opt.WriteBufferSize,
|
||||
|
||||
TLSConfig: opt.TLSConfig,
|
||||
Limiter: opt.Limiter,
|
||||
|
Reference in New Issue
Block a user