mirror of
				https://github.com/redis/go-redis.git
				synced 2025-11-04 02:33:24 +03:00 
			
		
		
		
	* after the connection pool is closed, no new connections should be added Signed-off-by: monkey92t <golang@88.com> * remove runGoroutine Signed-off-by: monkey92t <golang@88.com> * pool.popIdle add p.closed check Signed-off-by: monkey92t <golang@88.com> * upgrade golangci-lint v1.42.0 Signed-off-by: monkey92t <golang@88.com>
		
			
				
	
	
		
			333 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			333 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package proto
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
 | 
						|
	"github.com/go-redis/redis/v8/internal/util"
 | 
						|
)
 | 
						|
 | 
						|
// redis resp protocol data type.
 | 
						|
const (
 | 
						|
	ErrorReply  = '-'
 | 
						|
	StatusReply = '+'
 | 
						|
	IntReply    = ':'
 | 
						|
	StringReply = '$'
 | 
						|
	ArrayReply  = '*'
 | 
						|
)
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
const Nil = RedisError("redis: nil") // nolint:errname
 | 
						|
 | 
						|
type RedisError string
 | 
						|
 | 
						|
func (e RedisError) Error() string { return string(e) }
 | 
						|
 | 
						|
func (RedisError) RedisError() {}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
type MultiBulkParse func(*Reader, int64) (interface{}, error)
 | 
						|
 | 
						|
type Reader struct {
 | 
						|
	rd   *bufio.Reader
 | 
						|
	_buf []byte
 | 
						|
}
 | 
						|
 | 
						|
func NewReader(rd io.Reader) *Reader {
 | 
						|
	return &Reader{
 | 
						|
		rd:   bufio.NewReader(rd),
 | 
						|
		_buf: make([]byte, 64),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) Buffered() int {
 | 
						|
	return r.rd.Buffered()
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) Peek(n int) ([]byte, error) {
 | 
						|
	return r.rd.Peek(n)
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) Reset(rd io.Reader) {
 | 
						|
	r.rd.Reset(rd)
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) ReadLine() ([]byte, error) {
 | 
						|
	line, err := r.readLine()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if isNilReply(line) {
 | 
						|
		return nil, Nil
 | 
						|
	}
 | 
						|
	return line, nil
 | 
						|
}
 | 
						|
 | 
						|
// readLine that returns an error if:
 | 
						|
//   - there is a pending read error;
 | 
						|
//   - or line does not end with \r\n.
 | 
						|
func (r *Reader) readLine() ([]byte, error) {
 | 
						|
	b, err := r.rd.ReadSlice('\n')
 | 
						|
	if err != nil {
 | 
						|
		if err != bufio.ErrBufferFull {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		full := make([]byte, len(b))
 | 
						|
		copy(full, b)
 | 
						|
 | 
						|
		b, err = r.rd.ReadBytes('\n')
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		full = append(full, b...) //nolint:makezero
 | 
						|
		b = full
 | 
						|
	}
 | 
						|
	if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' {
 | 
						|
		return nil, fmt.Errorf("redis: invalid reply: %q", b)
 | 
						|
	}
 | 
						|
	return b[:len(b)-2], nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
 | 
						|
	line, err := r.ReadLine()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	switch line[0] {
 | 
						|
	case ErrorReply:
 | 
						|
		return nil, ParseErrorReply(line)
 | 
						|
	case StatusReply:
 | 
						|
		return string(line[1:]), nil
 | 
						|
	case IntReply:
 | 
						|
		return util.ParseInt(line[1:], 10, 64)
 | 
						|
	case StringReply:
 | 
						|
		return r.readStringReply(line)
 | 
						|
	case ArrayReply:
 | 
						|
		n, err := parseArrayLen(line)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		if m == nil {
 | 
						|
			err := fmt.Errorf("redis: got %.100q, but multi bulk parser is nil", line)
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		return m(r, n)
 | 
						|
	}
 | 
						|
	return nil, fmt.Errorf("redis: can't parse %.100q", line)
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) ReadIntReply() (int64, error) {
 | 
						|
	line, err := r.ReadLine()
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	switch line[0] {
 | 
						|
	case ErrorReply:
 | 
						|
		return 0, ParseErrorReply(line)
 | 
						|
	case IntReply:
 | 
						|
		return util.ParseInt(line[1:], 10, 64)
 | 
						|
	default:
 | 
						|
		return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) ReadString() (string, error) {
 | 
						|
	line, err := r.ReadLine()
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	switch line[0] {
 | 
						|
	case ErrorReply:
 | 
						|
		return "", ParseErrorReply(line)
 | 
						|
	case StringReply:
 | 
						|
		return r.readStringReply(line)
 | 
						|
	case StatusReply:
 | 
						|
		return string(line[1:]), nil
 | 
						|
	case IntReply:
 | 
						|
		return string(line[1:]), nil
 | 
						|
	default:
 | 
						|
		return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) readStringReply(line []byte) (string, error) {
 | 
						|
	if isNilReply(line) {
 | 
						|
		return "", Nil
 | 
						|
	}
 | 
						|
 | 
						|
	replyLen, err := util.Atoi(line[1:])
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	b := make([]byte, replyLen+2)
 | 
						|
	_, err = io.ReadFull(r.rd, b)
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	return util.BytesToString(b[:replyLen]), nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
 | 
						|
	line, err := r.ReadLine()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	switch line[0] {
 | 
						|
	case ErrorReply:
 | 
						|
		return nil, ParseErrorReply(line)
 | 
						|
	case ArrayReply:
 | 
						|
		n, err := parseArrayLen(line)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		return m(r, n)
 | 
						|
	default:
 | 
						|
		return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) ReadArrayLen() (int, error) {
 | 
						|
	line, err := r.ReadLine()
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	switch line[0] {
 | 
						|
	case ErrorReply:
 | 
						|
		return 0, ParseErrorReply(line)
 | 
						|
	case ArrayReply:
 | 
						|
		n, err := parseArrayLen(line)
 | 
						|
		if err != nil {
 | 
						|
			return 0, err
 | 
						|
		}
 | 
						|
		return int(n), nil
 | 
						|
	default:
 | 
						|
		return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) ReadScanReply() ([]string, uint64, error) {
 | 
						|
	n, err := r.ReadArrayLen()
 | 
						|
	if err != nil {
 | 
						|
		return nil, 0, err
 | 
						|
	}
 | 
						|
	if n != 2 {
 | 
						|
		return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
 | 
						|
	}
 | 
						|
 | 
						|
	cursor, err := r.ReadUint()
 | 
						|
	if err != nil {
 | 
						|
		return nil, 0, err
 | 
						|
	}
 | 
						|
 | 
						|
	n, err = r.ReadArrayLen()
 | 
						|
	if err != nil {
 | 
						|
		return nil, 0, err
 | 
						|
	}
 | 
						|
 | 
						|
	keys := make([]string, n)
 | 
						|
 | 
						|
	for i := 0; i < n; i++ {
 | 
						|
		key, err := r.ReadString()
 | 
						|
		if err != nil {
 | 
						|
			return nil, 0, err
 | 
						|
		}
 | 
						|
		keys[i] = key
 | 
						|
	}
 | 
						|
 | 
						|
	return keys, cursor, err
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) ReadInt() (int64, error) {
 | 
						|
	b, err := r.readTmpBytesReply()
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	return util.ParseInt(b, 10, 64)
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) ReadUint() (uint64, error) {
 | 
						|
	b, err := r.readTmpBytesReply()
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	return util.ParseUint(b, 10, 64)
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) ReadFloatReply() (float64, error) {
 | 
						|
	b, err := r.readTmpBytesReply()
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	return util.ParseFloat(b, 64)
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) readTmpBytesReply() ([]byte, error) {
 | 
						|
	line, err := r.ReadLine()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	switch line[0] {
 | 
						|
	case ErrorReply:
 | 
						|
		return nil, ParseErrorReply(line)
 | 
						|
	case StringReply:
 | 
						|
		return r._readTmpBytesReply(line)
 | 
						|
	case StatusReply:
 | 
						|
		return line[1:], nil
 | 
						|
	default:
 | 
						|
		return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) {
 | 
						|
	if isNilReply(line) {
 | 
						|
		return nil, Nil
 | 
						|
	}
 | 
						|
 | 
						|
	replyLen, err := util.Atoi(line[1:])
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	buf := r.buf(replyLen + 2)
 | 
						|
	_, err = io.ReadFull(r.rd, buf)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return buf[:replyLen], nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) buf(n int) []byte {
 | 
						|
	if n <= cap(r._buf) {
 | 
						|
		return r._buf[:n]
 | 
						|
	}
 | 
						|
	d := n - cap(r._buf)
 | 
						|
	r._buf = append(r._buf, make([]byte, d)...)
 | 
						|
	return r._buf
 | 
						|
}
 | 
						|
 | 
						|
func isNilReply(b []byte) bool {
 | 
						|
	return len(b) == 3 &&
 | 
						|
		(b[0] == StringReply || b[0] == ArrayReply) &&
 | 
						|
		b[1] == '-' && b[2] == '1'
 | 
						|
}
 | 
						|
 | 
						|
func ParseErrorReply(line []byte) error {
 | 
						|
	return RedisError(string(line[1:]))
 | 
						|
}
 | 
						|
 | 
						|
func parseArrayLen(line []byte) (int64, error) {
 | 
						|
	if isNilReply(line) {
 | 
						|
		return 0, Nil
 | 
						|
	}
 | 
						|
	return util.ParseInt(line[1:], 10, 64)
 | 
						|
}
 |