mirror of
				https://github.com/redis/go-redis.git
				synced 2025-10-30 16:45:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			553 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			553 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package proto
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"math"
 | |
| 	"math/big"
 | |
| 	"strconv"
 | |
| 
 | |
| 	"github.com/redis/go-redis/v9/internal/util"
 | |
| )
 | |
| 
 | |
| // redis resp protocol data type.
 | |
| const (
 | |
| 	RespStatus    = '+' // +<string>\r\n
 | |
| 	RespError     = '-' // -<string>\r\n
 | |
| 	RespString    = '$' // $<length>\r\n<bytes>\r\n
 | |
| 	RespInt       = ':' // :<number>\r\n
 | |
| 	RespNil       = '_' // _\r\n
 | |
| 	RespFloat     = ',' // ,<floating-point-number>\r\n (golang float)
 | |
| 	RespBool      = '#' // true: #t\r\n false: #f\r\n
 | |
| 	RespBlobError = '!' // !<length>\r\n<bytes>\r\n
 | |
| 	RespVerbatim  = '=' // =<length>\r\nFORMAT:<bytes>\r\n
 | |
| 	RespBigInt    = '(' // (<big number>\r\n
 | |
| 	RespArray     = '*' // *<len>\r\n... (same as resp2)
 | |
| 	RespMap       = '%' // %<len>\r\n(key)\r\n(value)\r\n... (golang map)
 | |
| 	RespSet       = '~' // ~<len>\r\n... (same as Array)
 | |
| 	RespAttr      = '|' // |<len>\r\n(key)\r\n(value)\r\n... + command reply
 | |
| 	RespPush      = '>' // ><len>\r\n... (same as Array)
 | |
| )
 | |
| 
 | |
| // Not used temporarily.
 | |
| // Redis has not used these two data types for the time being, and will implement them later.
 | |
| // Streamed           = "EOF:"
 | |
| // StreamedAggregated = '?'
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| 
 | |
| const Nil = RedisError("redis: nil") // nolint:errname
 | |
| 
 | |
| type RedisError string
 | |
| 
 | |
| func (e RedisError) Error() string { return string(e) }
 | |
| 
 | |
| func (RedisError) RedisError() {}
 | |
| 
 | |
| func ParseErrorReply(line []byte) error {
 | |
| 	return RedisError(line[1:])
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| 
 | |
| type Reader struct {
 | |
| 	rd *bufio.Reader
 | |
| }
 | |
| 
 | |
| func NewReader(rd io.Reader) *Reader {
 | |
| 	return &Reader{
 | |
| 		rd: bufio.NewReader(rd),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *Reader) Buffered() int {
 | |
| 	return r.rd.Buffered()
 | |
| }
 | |
| 
 | |
| func (r *Reader) Peek(n int) ([]byte, error) {
 | |
| 	return r.rd.Peek(n)
 | |
| }
 | |
| 
 | |
| func (r *Reader) Reset(rd io.Reader) {
 | |
| 	r.rd.Reset(rd)
 | |
| }
 | |
| 
 | |
| // PeekReplyType returns the data type of the next response without advancing the Reader,
 | |
| // and discard the attribute type.
 | |
| func (r *Reader) PeekReplyType() (byte, error) {
 | |
| 	b, err := r.rd.Peek(1)
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	if b[0] == RespAttr {
 | |
| 		if err = r.DiscardNext(); err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 		return r.PeekReplyType()
 | |
| 	}
 | |
| 	return b[0], nil
 | |
| }
 | |
| 
 | |
| // ReadLine Return a valid reply, it will check the protocol or redis error,
 | |
| // and discard the attribute type.
 | |
| func (r *Reader) ReadLine() ([]byte, error) {
 | |
| 	line, err := r.readLine()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	switch line[0] {
 | |
| 	case RespError:
 | |
| 		return nil, ParseErrorReply(line)
 | |
| 	case RespNil:
 | |
| 		return nil, Nil
 | |
| 	case RespBlobError:
 | |
| 		var blobErr string
 | |
| 		blobErr, err = r.readStringReply(line)
 | |
| 		if err == nil {
 | |
| 			err = RedisError(blobErr)
 | |
| 		}
 | |
| 		return nil, err
 | |
| 	case RespAttr:
 | |
| 		if err = r.Discard(line); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		return r.ReadLine()
 | |
| 	}
 | |
| 
 | |
| 	// Compatible with RESP2
 | |
| 	if IsNilReply(line) {
 | |
| 		return nil, Nil
 | |
| 	}
 | |
| 
 | |
| 	return line, nil
 | |
| }
 | |
| 
 | |
| // readLine returns an error if:
 | |
| //   - there is a pending read error;
 | |
| //   - or line does not end with \r\n.
 | |
| func (r *Reader) readLine() ([]byte, error) {
 | |
| 	b, err := r.rd.ReadSlice('\n')
 | |
| 	if err != nil {
 | |
| 		if err != bufio.ErrBufferFull {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		full := make([]byte, len(b))
 | |
| 		copy(full, b)
 | |
| 
 | |
| 		b, err = r.rd.ReadBytes('\n')
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		full = append(full, b...) //nolint:makezero
 | |
| 		b = full
 | |
| 	}
 | |
| 	if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' {
 | |
| 		return nil, fmt.Errorf("redis: invalid reply: %q", b)
 | |
| 	}
 | |
| 	return b[:len(b)-2], nil
 | |
| }
 | |
| 
 | |
| func (r *Reader) ReadReply() (interface{}, error) {
 | |
| 	line, err := r.ReadLine()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	switch line[0] {
 | |
| 	case RespStatus:
 | |
| 		return string(line[1:]), nil
 | |
| 	case RespInt:
 | |
| 		return util.ParseInt(line[1:], 10, 64)
 | |
| 	case RespFloat:
 | |
| 		return r.readFloat(line)
 | |
| 	case RespBool:
 | |
| 		return r.readBool(line)
 | |
| 	case RespBigInt:
 | |
| 		return r.readBigInt(line)
 | |
| 
 | |
| 	case RespString:
 | |
| 		return r.readStringReply(line)
 | |
| 	case RespVerbatim:
 | |
| 		return r.readVerb(line)
 | |
| 
 | |
| 	case RespArray, RespSet, RespPush:
 | |
| 		return r.readSlice(line)
 | |
| 	case RespMap:
 | |
| 		return r.readMap(line)
 | |
| 	}
 | |
| 	return nil, fmt.Errorf("redis: can't parse %.100q", line)
 | |
| }
 | |
| 
 | |
| func (r *Reader) readFloat(line []byte) (float64, error) {
 | |
| 	v := string(line[1:])
 | |
| 	switch string(line[1:]) {
 | |
| 	case "inf":
 | |
| 		return math.Inf(1), nil
 | |
| 	case "-inf":
 | |
| 		return math.Inf(-1), nil
 | |
| 	case "nan", "-nan":
 | |
| 		return math.NaN(), nil
 | |
| 	}
 | |
| 	return strconv.ParseFloat(v, 64)
 | |
| }
 | |
| 
 | |
| func (r *Reader) readBool(line []byte) (bool, error) {
 | |
| 	switch string(line[1:]) {
 | |
| 	case "t":
 | |
| 		return true, nil
 | |
| 	case "f":
 | |
| 		return false, nil
 | |
| 	}
 | |
| 	return false, fmt.Errorf("redis: can't parse bool reply: %q", line)
 | |
| }
 | |
| 
 | |
| func (r *Reader) readBigInt(line []byte) (*big.Int, error) {
 | |
| 	i := new(big.Int)
 | |
| 	if i, ok := i.SetString(string(line[1:]), 10); ok {
 | |
| 		return i, nil
 | |
| 	}
 | |
| 	return nil, fmt.Errorf("redis: can't parse bigInt reply: %q", line)
 | |
| }
 | |
| 
 | |
| func (r *Reader) readStringReply(line []byte) (string, error) {
 | |
| 	n, err := replyLen(line)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	b := make([]byte, n+2)
 | |
| 	_, err = io.ReadFull(r.rd, b)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	return util.BytesToString(b[:n]), nil
 | |
| }
 | |
| 
 | |
| func (r *Reader) readVerb(line []byte) (string, error) {
 | |
| 	s, err := r.readStringReply(line)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	if len(s) < 4 || s[3] != ':' {
 | |
| 		return "", fmt.Errorf("redis: can't parse verbatim string reply: %q", line)
 | |
| 	}
 | |
| 	return s[4:], nil
 | |
| }
 | |
| 
 | |
| func (r *Reader) readSlice(line []byte) ([]interface{}, error) {
 | |
| 	n, err := replyLen(line)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	val := make([]interface{}, n)
 | |
| 	for i := 0; i < len(val); i++ {
 | |
| 		v, err := r.ReadReply()
 | |
| 		if err != nil {
 | |
| 			if err == Nil {
 | |
| 				val[i] = nil
 | |
| 				continue
 | |
| 			}
 | |
| 			if err, ok := err.(RedisError); ok {
 | |
| 				val[i] = err
 | |
| 				continue
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		val[i] = v
 | |
| 	}
 | |
| 	return val, nil
 | |
| }
 | |
| 
 | |
| func (r *Reader) readMap(line []byte) (map[interface{}]interface{}, error) {
 | |
| 	n, err := replyLen(line)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	m := make(map[interface{}]interface{}, n)
 | |
| 	for i := 0; i < n; i++ {
 | |
| 		k, err := r.ReadReply()
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		v, err := r.ReadReply()
 | |
| 		if err != nil {
 | |
| 			if err == Nil {
 | |
| 				m[k] = nil
 | |
| 				continue
 | |
| 			}
 | |
| 			if err, ok := err.(RedisError); ok {
 | |
| 				m[k] = err
 | |
| 				continue
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		m[k] = v
 | |
| 	}
 | |
| 	return m, nil
 | |
| }
 | |
| 
 | |
| // -------------------------------
 | |
| 
 | |
| func (r *Reader) ReadInt() (int64, error) {
 | |
| 	line, err := r.ReadLine()
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	switch line[0] {
 | |
| 	case RespInt, RespStatus:
 | |
| 		return util.ParseInt(line[1:], 10, 64)
 | |
| 	case RespString:
 | |
| 		s, err := r.readStringReply(line)
 | |
| 		if err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 		return util.ParseInt([]byte(s), 10, 64)
 | |
| 	case RespBigInt:
 | |
| 		b, err := r.readBigInt(line)
 | |
| 		if err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 		if !b.IsInt64() {
 | |
| 			return 0, fmt.Errorf("bigInt(%s) value out of range", b.String())
 | |
| 		}
 | |
| 		return b.Int64(), nil
 | |
| 	}
 | |
| 	return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
 | |
| }
 | |
| 
 | |
| func (r *Reader) ReadUint() (uint64, error) {
 | |
| 	line, err := r.ReadLine()
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	switch line[0] {
 | |
| 	case RespInt, RespStatus:
 | |
| 		return util.ParseUint(line[1:], 10, 64)
 | |
| 	case RespString:
 | |
| 		s, err := r.readStringReply(line)
 | |
| 		if err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 		return util.ParseUint([]byte(s), 10, 64)
 | |
| 	case RespBigInt:
 | |
| 		b, err := r.readBigInt(line)
 | |
| 		if err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 		if !b.IsUint64() {
 | |
| 			return 0, fmt.Errorf("bigInt(%s) value out of range", b.String())
 | |
| 		}
 | |
| 		return b.Uint64(), nil
 | |
| 	}
 | |
| 	return 0, fmt.Errorf("redis: can't parse uint reply: %.100q", line)
 | |
| }
 | |
| 
 | |
| func (r *Reader) ReadFloat() (float64, error) {
 | |
| 	line, err := r.ReadLine()
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	switch line[0] {
 | |
| 	case RespFloat:
 | |
| 		return r.readFloat(line)
 | |
| 	case RespStatus:
 | |
| 		return strconv.ParseFloat(string(line[1:]), 64)
 | |
| 	case RespString:
 | |
| 		s, err := r.readStringReply(line)
 | |
| 		if err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 		return strconv.ParseFloat(s, 64)
 | |
| 	}
 | |
| 	return 0, fmt.Errorf("redis: can't parse float reply: %.100q", line)
 | |
| }
 | |
| 
 | |
| func (r *Reader) ReadString() (string, error) {
 | |
| 	line, err := r.ReadLine()
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	switch line[0] {
 | |
| 	case RespStatus, RespInt, RespFloat:
 | |
| 		return string(line[1:]), nil
 | |
| 	case RespString:
 | |
| 		return r.readStringReply(line)
 | |
| 	case RespBool:
 | |
| 		b, err := r.readBool(line)
 | |
| 		return strconv.FormatBool(b), err
 | |
| 	case RespVerbatim:
 | |
| 		return r.readVerb(line)
 | |
| 	case RespBigInt:
 | |
| 		b, err := r.readBigInt(line)
 | |
| 		if err != nil {
 | |
| 			return "", err
 | |
| 		}
 | |
| 		return b.String(), nil
 | |
| 	}
 | |
| 	return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
 | |
| }
 | |
| 
 | |
| func (r *Reader) ReadBool() (bool, error) {
 | |
| 	s, err := r.ReadString()
 | |
| 	if err != nil {
 | |
| 		return false, err
 | |
| 	}
 | |
| 	return s == "OK" || s == "1" || s == "true", nil
 | |
| }
 | |
| 
 | |
| func (r *Reader) ReadSlice() ([]interface{}, error) {
 | |
| 	line, err := r.ReadLine()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return r.readSlice(line)
 | |
| }
 | |
| 
 | |
| // ReadFixedArrayLen read fixed array length.
 | |
| func (r *Reader) ReadFixedArrayLen(fixedLen int) error {
 | |
| 	n, err := r.ReadArrayLen()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if n != fixedLen {
 | |
| 		return fmt.Errorf("redis: got %d elements in the array, wanted %d", n, fixedLen)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ReadArrayLen Read and return the length of the array.
 | |
| func (r *Reader) ReadArrayLen() (int, error) {
 | |
| 	line, err := r.ReadLine()
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	switch line[0] {
 | |
| 	case RespArray, RespSet, RespPush:
 | |
| 		return replyLen(line)
 | |
| 	default:
 | |
| 		return 0, fmt.Errorf("redis: can't parse array/set/push reply: %.100q", line)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // ReadFixedMapLen reads fixed map length.
 | |
| func (r *Reader) ReadFixedMapLen(fixedLen int) error {
 | |
| 	n, err := r.ReadMapLen()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if n != fixedLen {
 | |
| 		return fmt.Errorf("redis: got %d elements in the map, wanted %d", n, fixedLen)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ReadMapLen reads the length of the map type.
 | |
| // If responding to the array type (RespArray/RespSet/RespPush),
 | |
| // it must be a multiple of 2 and return n/2.
 | |
| // Other types will return an error.
 | |
| func (r *Reader) ReadMapLen() (int, error) {
 | |
| 	line, err := r.ReadLine()
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	switch line[0] {
 | |
| 	case RespMap:
 | |
| 		return replyLen(line)
 | |
| 	case RespArray, RespSet, RespPush:
 | |
| 		// Some commands and RESP2 protocol may respond to array types.
 | |
| 		n, err := replyLen(line)
 | |
| 		if err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 		if n%2 != 0 {
 | |
| 			return 0, fmt.Errorf("redis: the length of the array must be a multiple of 2, got: %d", n)
 | |
| 		}
 | |
| 		return n / 2, nil
 | |
| 	default:
 | |
| 		return 0, fmt.Errorf("redis: can't parse map reply: %.100q", line)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // DiscardNext read and discard the data represented by the next line.
 | |
| func (r *Reader) DiscardNext() error {
 | |
| 	line, err := r.readLine()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return r.Discard(line)
 | |
| }
 | |
| 
 | |
| // Discard the data represented by line.
 | |
| func (r *Reader) Discard(line []byte) (err error) {
 | |
| 	if len(line) == 0 {
 | |
| 		return errors.New("redis: invalid line")
 | |
| 	}
 | |
| 	switch line[0] {
 | |
| 	case RespStatus, RespError, RespInt, RespNil, RespFloat, RespBool, RespBigInt:
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	n, err := replyLen(line)
 | |
| 	if err != nil && err != Nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	switch line[0] {
 | |
| 	case RespBlobError, RespString, RespVerbatim:
 | |
| 		// +\r\n
 | |
| 		_, err = r.rd.Discard(n + 2)
 | |
| 		return err
 | |
| 	case RespArray, RespSet, RespPush:
 | |
| 		for i := 0; i < n; i++ {
 | |
| 			if err = r.DiscardNext(); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 		return nil
 | |
| 	case RespMap, RespAttr:
 | |
| 		// Read key & value.
 | |
| 		for i := 0; i < n*2; i++ {
 | |
| 			if err = r.DiscardNext(); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return fmt.Errorf("redis: can't parse %.100q", line)
 | |
| }
 | |
| 
 | |
| func replyLen(line []byte) (n int, err error) {
 | |
| 	n, err = util.Atoi(line[1:])
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	if n < -1 {
 | |
| 		return 0, fmt.Errorf("redis: invalid reply: %q", line)
 | |
| 	}
 | |
| 
 | |
| 	switch line[0] {
 | |
| 	case RespString, RespVerbatim, RespBlobError,
 | |
| 		RespArray, RespSet, RespPush, RespMap, RespAttr:
 | |
| 		if n == -1 {
 | |
| 			return 0, Nil
 | |
| 		}
 | |
| 	}
 | |
| 	return n, nil
 | |
| }
 | |
| 
 | |
| // IsNilReply detects redis.Nil of RESP2.
 | |
| func IsNilReply(line []byte) bool {
 | |
| 	return len(line) == 3 &&
 | |
| 		(line[0] == RespString || line[0] == RespArray) &&
 | |
| 		line[1] == '-' && line[2] == '1'
 | |
| }
 |