From 1eed165f9df84f94c52c1d9daeb079677117b430 Mon Sep 17 00:00:00 2001 From: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> Date: Mon, 4 Aug 2025 09:16:54 +0300 Subject: [PATCH] feat(proto): add configurable buffer sizes for Redis connections (#3453) * add configurable buffer sizes for Redis connections * add MiB to wordlist * Add description for buffer size parameter --- .github/wordlist.txt | 3 +- README.md | 12 ++ internal/pool/buffer_size_test.go | 183 ++++++++++++++++++++++++++++++ internal/pool/conn.go | 20 +++- internal/pool/pool.go | 5 +- internal/proto/reader.go | 11 +- options.go | 23 ++++ osscluster.go | 22 ++++ ring.go | 24 ++++ 9 files changed, 298 insertions(+), 5 deletions(-) create mode 100644 internal/pool/buffer_size_test.go diff --git a/.github/wordlist.txt b/.github/wordlist.txt index a922d99b..e0c73eb5 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -73,4 +73,5 @@ OAuth Azure StreamingCredentialsProvider oauth -entraid \ No newline at end of file +entraid +MiB \ No newline at end of file diff --git a/README.md b/README.md index c37a52ec..356870b1 100644 --- a/README.md +++ b/README.md @@ -297,6 +297,18 @@ func main() { ``` +### Buffer Size Configuration + +go-redis uses 0.5MiB read and write buffers by default for optimal performance. For high-throughput applications or large pipelines, you can customize buffer sizes: + +```go +rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + ReadBufferSize: 1024 * 1024, // 1MiB read buffer + WriteBufferSize: 1024 * 1024, // 1MiB write buffer +}) +``` + ### Advanced Configuration go-redis supports extending the client identification phase to allow projects to send their own custom client identification. diff --git a/internal/pool/buffer_size_test.go b/internal/pool/buffer_size_test.go new file mode 100644 index 00000000..a5423010 --- /dev/null +++ b/internal/pool/buffer_size_test.go @@ -0,0 +1,183 @@ +package pool_test + +import ( + "bufio" + "context" + "net" + "unsafe" + + . "github.com/bsm/ginkgo/v2" + . "github.com/bsm/gomega" + + "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/internal/proto" +) + +var _ = Describe("Buffer Size Configuration", func() { + var connPool *pool.ConnPool + ctx := context.Background() + + AfterEach(func() { + if connPool != nil { + connPool.Close() + } + }) + + It("should use default buffer sizes when not specified", func() { + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 1, + PoolTimeout: 1000, + }) + + cn, err := connPool.NewConn(ctx) + Expect(err).NotTo(HaveOccurred()) + defer connPool.CloseConn(cn) + + // Check that default buffer sizes are used (0.5MiB) + writerBufSize := getWriterBufSizeUnsafe(cn) + readerBufSize := getReaderBufSizeUnsafe(cn) + + Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + }) + + It("should use custom buffer sizes when specified", func() { + customReadSize := 32 * 1024 // 32KB + customWriteSize := 64 * 1024 // 64KB + + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 1, + PoolTimeout: 1000, + ReadBufferSize: customReadSize, + WriteBufferSize: customWriteSize, + }) + + cn, err := connPool.NewConn(ctx) + Expect(err).NotTo(HaveOccurred()) + defer connPool.CloseConn(cn) + + // Check that custom buffer sizes are used + writerBufSize := getWriterBufSizeUnsafe(cn) + readerBufSize := getReaderBufSizeUnsafe(cn) + + Expect(writerBufSize).To(Equal(customWriteSize)) + Expect(readerBufSize).To(Equal(customReadSize)) + }) + + It("should handle zero buffer sizes by using defaults", func() { + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 1, + PoolTimeout: 1000, + ReadBufferSize: 0, // Should use default + WriteBufferSize: 0, // Should use default + }) + + cn, err := connPool.NewConn(ctx) + Expect(err).NotTo(HaveOccurred()) + defer connPool.CloseConn(cn) + + // Check that default buffer sizes are used (0.5MiB) + writerBufSize := getWriterBufSizeUnsafe(cn) + readerBufSize := getReaderBufSizeUnsafe(cn) + + Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + }) + + It("should use 0.5MiB default buffer sizes for standalone NewConn", func() { + // Test that NewConn (without pool) also uses 0.5MiB defaults + netConn := newDummyConn() + cn := pool.NewConn(netConn) + defer cn.Close() + + writerBufSize := getWriterBufSizeUnsafe(cn) + readerBufSize := getReaderBufSizeUnsafe(cn) + + Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + }) + + It("should use 0.5MiB defaults even when pool is created directly without buffer sizes", func() { + // Test the scenario where someone creates a pool directly (like in tests) + // without setting ReadBufferSize and WriteBufferSize + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 1, + PoolTimeout: 1000, + // ReadBufferSize and WriteBufferSize are not set (will be 0) + }) + + cn, err := connPool.NewConn(ctx) + Expect(err).NotTo(HaveOccurred()) + defer connPool.CloseConn(cn) + + // Should still get 0.5MiB defaults because NewConnPool sets them + writerBufSize := getWriterBufSizeUnsafe(cn) + readerBufSize := getReaderBufSizeUnsafe(cn) + + Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + }) +}) + +// Helper functions to extract buffer sizes using unsafe pointers +func getWriterBufSizeUnsafe(cn *pool.Conn) int { + cnPtr := (*struct { + usedAt int64 + netConn net.Conn + rd *proto.Reader + bw *bufio.Writer + wr *proto.Writer + // ... other fields + })(unsafe.Pointer(cn)) + + if cnPtr.bw == nil { + return -1 + } + + bwPtr := (*struct { + err error + buf []byte + n int + wr interface{} + })(unsafe.Pointer(cnPtr.bw)) + + return len(bwPtr.buf) +} + +func getReaderBufSizeUnsafe(cn *pool.Conn) int { + cnPtr := (*struct { + usedAt int64 + netConn net.Conn + rd *proto.Reader + bw *bufio.Writer + wr *proto.Writer + // ... other fields + })(unsafe.Pointer(cn)) + + if cnPtr.rd == nil { + return -1 + } + + rdPtr := (*struct { + rd *bufio.Reader + })(unsafe.Pointer(cnPtr.rd)) + + if rdPtr.rd == nil { + return -1 + } + + bufReaderPtr := (*struct { + buf []byte + rd interface{} + r, w int + err error + lastByte int + lastRuneSize int + })(unsafe.Pointer(rdPtr.rd)) + + return len(bufReaderPtr.buf) +} diff --git a/internal/pool/conn.go b/internal/pool/conn.go index c1087b40..989ab10d 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -28,12 +28,28 @@ type Conn struct { } func NewConn(netConn net.Conn) *Conn { + return NewConnWithBufferSize(netConn, proto.DefaultBufferSize, proto.DefaultBufferSize) +} + +func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Conn { cn := &Conn{ netConn: netConn, createdAt: time.Now(), } - cn.rd = proto.NewReader(netConn) - cn.bw = bufio.NewWriter(netConn) + + // Use specified buffer sizes, or fall back to 0.5MiB defaults if 0 + if readBufSize > 0 { + cn.rd = proto.NewReaderSize(netConn, readBufSize) + } else { + cn.rd = proto.NewReader(netConn) // Uses 0.5MiB default + } + + if writeBufSize > 0 { + cn.bw = bufio.NewWriterSize(netConn, writeBufSize) + } else { + cn.bw = bufio.NewWriterSize(netConn, proto.DefaultBufferSize) + } + cn.wr = proto.NewWriter(cn.bw) cn.SetUsedAt(time.Now()) return cn diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 3ee3dea6..6d3381c9 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -71,6 +71,9 @@ type Options struct { MaxActiveConns int ConnMaxIdleTime time.Duration ConnMaxLifetime time.Duration + + ReadBufferSize int + WriteBufferSize int } type lastDialErrorWrap struct { @@ -226,7 +229,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { return nil, err } - cn := NewConn(netConn) + cn := NewConnWithBufferSize(netConn, p.cfg.ReadBufferSize, p.cfg.WriteBufferSize) cn.pooled = pooled return cn, nil } diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 8d23817f..a4478099 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -12,6 +12,9 @@ import ( "github.com/redis/go-redis/v9/internal/util" ) +// DefaultBufferSize is the default size for read/write buffers (0.5MiB) +const DefaultBufferSize = 512 * 1024 + // redis resp protocol data type. const ( RespStatus = '+' // +\r\n @@ -58,7 +61,13 @@ type Reader struct { func NewReader(rd io.Reader) *Reader { return &Reader{ - rd: bufio.NewReader(rd), + rd: bufio.NewReaderSize(rd, DefaultBufferSize), + } +} + +func NewReaderSize(rd io.Reader, size int) *Reader { + return &Reader{ + rd: bufio.NewReaderSize(rd, size), } } diff --git a/options.go b/options.go index b87a234a..2ce807e4 100644 --- a/options.go +++ b/options.go @@ -15,6 +15,7 @@ import ( "github.com/redis/go-redis/v9/auth" "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/internal/proto" ) // Limiter is the interface of a rate limiter or a circuit breaker. @@ -130,6 +131,20 @@ type Options struct { // See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts ContextTimeoutEnabled bool + // ReadBufferSize is the size of the bufio.Reader buffer for each connection. + // Larger buffers can improve performance for commands that return large responses. + // Smaller buffers can improve memory usage for larger pools. + // + // default: 0.5MiB (524288 bytes) + ReadBufferSize int + + // WriteBufferSize is the size of the bufio.Writer buffer for each connection. + // Larger buffers can improve performance for large pipelines and commands with many arguments. + // Smaller buffers can improve memory usage for larger pools. + // + // default: 0.5MiB (524288 bytes) + WriteBufferSize int + // PoolFIFO type of connection pool. // // - true for FIFO pool @@ -241,6 +256,12 @@ func (opt *Options) init() { if opt.PoolSize == 0 { opt.PoolSize = 10 * runtime.GOMAXPROCS(0) } + if opt.ReadBufferSize == 0 { + opt.ReadBufferSize = proto.DefaultBufferSize + } + if opt.WriteBufferSize == 0 { + opt.WriteBufferSize = proto.DefaultBufferSize + } switch opt.ReadTimeout { case -2: opt.ReadTimeout = -1 @@ -592,5 +613,7 @@ func newConnPool( MaxActiveConns: opt.MaxActiveConns, ConnMaxIdleTime: opt.ConnMaxIdleTime, ConnMaxLifetime: opt.ConnMaxLifetime, + ReadBufferSize: opt.ReadBufferSize, + WriteBufferSize: opt.WriteBufferSize, }) } diff --git a/osscluster.go b/osscluster.go index 0526022b..ad654821 100644 --- a/osscluster.go +++ b/osscluster.go @@ -92,6 +92,20 @@ type ClusterOptions struct { ConnMaxIdleTime time.Duration ConnMaxLifetime time.Duration + // ReadBufferSize is the size of the bufio.Reader buffer for each connection. + // Larger buffers can improve performance for commands that return large responses. + // Smaller buffers can improve memory usage for larger pools. + // + // default: 0.5MiB (524288 bytes) + ReadBufferSize int + + // WriteBufferSize is the size of the bufio.Writer buffer for each connection. + // Larger buffers can improve performance for large pipelines and commands with many arguments. + // Smaller buffers can improve memory usage for larger pools. + // + // default: 0.5MiB (524288 bytes) + WriteBufferSize int + TLSConfig *tls.Config // DisableIndentity - Disable set-lib on connect. @@ -127,6 +141,12 @@ func (opt *ClusterOptions) init() { if opt.PoolSize == 0 { opt.PoolSize = 5 * runtime.GOMAXPROCS(0) } + if opt.ReadBufferSize == 0 { + opt.ReadBufferSize = proto.DefaultBufferSize + } + if opt.WriteBufferSize == 0 { + opt.WriteBufferSize = proto.DefaultBufferSize + } switch opt.ReadTimeout { case -1: @@ -318,6 +338,8 @@ func (opt *ClusterOptions) clientOptions() *Options { MaxActiveConns: opt.MaxActiveConns, ConnMaxIdleTime: opt.ConnMaxIdleTime, ConnMaxLifetime: opt.ConnMaxLifetime, + ReadBufferSize: opt.ReadBufferSize, + WriteBufferSize: opt.WriteBufferSize, DisableIdentity: opt.DisableIdentity, DisableIndentity: opt.DisableIdentity, IdentitySuffix: opt.IdentitySuffix, diff --git a/ring.go b/ring.go index 0c156601..0d73e010 100644 --- a/ring.go +++ b/ring.go @@ -18,6 +18,7 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/hashtag" "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/rand" ) @@ -113,6 +114,20 @@ type RingOptions struct { ConnMaxIdleTime time.Duration ConnMaxLifetime time.Duration + // ReadBufferSize is the size of the bufio.Reader buffer for each connection. + // Larger buffers can improve performance for commands that return large responses. + // Smaller buffers can improve memory usage for larger pools. + // + // default: 0.5MiB (524288 bytes) + ReadBufferSize int + + // WriteBufferSize is the size of the bufio.Writer buffer for each connection. + // Larger buffers can improve performance for large pipelines and commands with many arguments. + // Smaller buffers can improve memory usage for larger pools. + // + // default: 0.5MiB (524288 bytes) + WriteBufferSize int + TLSConfig *tls.Config Limiter Limiter @@ -164,6 +179,13 @@ func (opt *RingOptions) init() { case 0: opt.MaxRetryBackoff = 512 * time.Millisecond } + + if opt.ReadBufferSize == 0 { + opt.ReadBufferSize = proto.DefaultBufferSize + } + if opt.WriteBufferSize == 0 { + opt.WriteBufferSize = proto.DefaultBufferSize + } } func (opt *RingOptions) clientOptions() *Options { @@ -195,6 +217,8 @@ func (opt *RingOptions) clientOptions() *Options { MaxActiveConns: opt.MaxActiveConns, ConnMaxIdleTime: opt.ConnMaxIdleTime, ConnMaxLifetime: opt.ConnMaxLifetime, + ReadBufferSize: opt.ReadBufferSize, + WriteBufferSize: opt.WriteBufferSize, TLSConfig: opt.TLSConfig, Limiter: opt.Limiter,