1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-31 05:04:23 +03:00

Make proto/parser an internal package

This commit is contained in:
Dimitrij Denissenko
2016-07-02 13:52:10 +01:00
parent 5c3ab24e0a
commit 7d856c5595
24 changed files with 841 additions and 712 deletions

546
parser.go
View File

@ -1,446 +1,18 @@
package redis
import (
"bufio"
"errors"
"fmt"
"net"
"strconv"
"gopkg.in/redis.v4/internal/pool"
"gopkg.in/redis.v4/internal/proto"
)
const (
errorReply = '-'
statusReply = '+'
intReply = ':'
stringReply = '$'
arrayReply = '*'
)
type multiBulkParser func(cn *pool.Conn, n int64) (interface{}, error)
var errEmptyReply = errors.New("redis: reply is empty")
//------------------------------------------------------------------------------
// Copy of encoding.BinaryMarshaler.
type binaryMarshaler interface {
MarshalBinary() (data []byte, err error)
}
// Copy of encoding.BinaryUnmarshaler.
type binaryUnmarshaler interface {
UnmarshalBinary(data []byte) error
}
func appendString(b []byte, s string) []byte {
b = append(b, '$')
b = strconv.AppendUint(b, uint64(len(s)), 10)
b = append(b, '\r', '\n')
b = append(b, s...)
b = append(b, '\r', '\n')
return b
}
func appendBytes(b, bb []byte) []byte {
b = append(b, '$')
b = strconv.AppendUint(b, uint64(len(bb)), 10)
b = append(b, '\r', '\n')
b = append(b, bb...)
b = append(b, '\r', '\n')
return b
}
func appendArg(b []byte, val interface{}) ([]byte, error) {
switch v := val.(type) {
case nil:
b = appendString(b, "")
case string:
b = appendString(b, v)
case []byte:
b = appendBytes(b, v)
case int:
b = appendString(b, formatInt(int64(v)))
case int8:
b = appendString(b, formatInt(int64(v)))
case int16:
b = appendString(b, formatInt(int64(v)))
case int32:
b = appendString(b, formatInt(int64(v)))
case int64:
b = appendString(b, formatInt(v))
case uint:
b = appendString(b, formatUint(uint64(v)))
case uint8:
b = appendString(b, formatUint(uint64(v)))
case uint16:
b = appendString(b, formatUint(uint64(v)))
case uint32:
b = appendString(b, formatUint(uint64(v)))
case uint64:
b = appendString(b, formatUint(v))
case float32:
b = appendString(b, formatFloat(float64(v)))
case float64:
b = appendString(b, formatFloat(v))
case bool:
if v {
b = appendString(b, "1")
} else {
b = appendString(b, "0")
}
default:
if bm, ok := val.(binaryMarshaler); ok {
bb, err := bm.MarshalBinary()
if err != nil {
return nil, err
}
b = appendBytes(b, bb)
} else {
err := fmt.Errorf(
"redis: can't marshal %T (consider implementing BinaryMarshaler)", val)
return nil, err
}
}
return b, nil
}
func appendArgs(b []byte, args []interface{}) ([]byte, error) {
b = append(b, arrayReply)
b = strconv.AppendUint(b, uint64(len(args)), 10)
b = append(b, '\r', '\n')
for _, arg := range args {
var err error
b, err = appendArg(b, arg)
if err != nil {
return nil, err
}
}
return b, nil
}
func scan(b []byte, val interface{}) error {
switch v := val.(type) {
case nil:
return errorf("redis: Scan(nil)")
case *string:
*v = bytesToString(b)
return nil
case *[]byte:
*v = b
return nil
case *int:
var err error
*v, err = strconv.Atoi(bytesToString(b))
return err
case *int8:
n, err := strconv.ParseInt(bytesToString(b), 10, 8)
if err != nil {
return err
}
*v = int8(n)
return nil
case *int16:
n, err := strconv.ParseInt(bytesToString(b), 10, 16)
if err != nil {
return err
}
*v = int16(n)
return nil
case *int32:
n, err := strconv.ParseInt(bytesToString(b), 10, 16)
if err != nil {
return err
}
*v = int32(n)
return nil
case *int64:
n, err := strconv.ParseInt(bytesToString(b), 10, 64)
if err != nil {
return err
}
*v = n
return nil
case *uint:
n, err := strconv.ParseUint(bytesToString(b), 10, 64)
if err != nil {
return err
}
*v = uint(n)
return nil
case *uint8:
n, err := strconv.ParseUint(bytesToString(b), 10, 8)
if err != nil {
return err
}
*v = uint8(n)
return nil
case *uint16:
n, err := strconv.ParseUint(bytesToString(b), 10, 16)
if err != nil {
return err
}
*v = uint16(n)
return nil
case *uint32:
n, err := strconv.ParseUint(bytesToString(b), 10, 32)
if err != nil {
return err
}
*v = uint32(n)
return nil
case *uint64:
n, err := strconv.ParseUint(bytesToString(b), 10, 64)
if err != nil {
return err
}
*v = n
return nil
case *float32:
n, err := strconv.ParseFloat(bytesToString(b), 32)
if err != nil {
return err
}
*v = float32(n)
return err
case *float64:
var err error
*v, err = strconv.ParseFloat(bytesToString(b), 64)
return err
case *bool:
*v = len(b) == 1 && b[0] == '1'
return nil
default:
if bu, ok := val.(binaryUnmarshaler); ok {
return bu.UnmarshalBinary(b)
}
err := fmt.Errorf(
"redis: can't unmarshal %T (consider implementing BinaryUnmarshaler)", val)
return err
}
}
//------------------------------------------------------------------------------
func readLine(cn *pool.Conn) ([]byte, error) {
line, isPrefix, err := cn.Rd.ReadLine()
if err != nil {
return nil, err
}
if isPrefix {
return nil, bufio.ErrBufferFull
}
if len(line) == 0 {
return nil, errEmptyReply
}
if isNilReply(line) {
return nil, Nil
}
return line, nil
}
func isNilReply(b []byte) bool {
return len(b) == 3 &&
(b[0] == stringReply || b[0] == arrayReply) &&
b[1] == '-' && b[2] == '1'
}
//------------------------------------------------------------------------------
func parseErrorReply(cn *pool.Conn, line []byte) error {
return errorf(string(line[1:]))
}
func parseStatusReply(cn *pool.Conn, line []byte) ([]byte, error) {
return line[1:], nil
}
func parseIntReply(cn *pool.Conn, line []byte) (int64, error) {
n, err := strconv.ParseInt(bytesToString(line[1:]), 10, 64)
if err != nil {
return 0, err
}
return n, nil
}
func readIntReply(cn *pool.Conn) (int64, error) {
line, err := readLine(cn)
if err != nil {
return 0, err
}
switch line[0] {
case errorReply:
return 0, parseErrorReply(cn, line)
case intReply:
return parseIntReply(cn, line)
default:
return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
}
}
func parseBytesReply(cn *pool.Conn, line []byte) ([]byte, error) {
if isNilReply(line) {
return nil, Nil
}
replyLen, err := strconv.Atoi(bytesToString(line[1:]))
if err != nil {
return nil, err
}
b, err := cn.ReadN(replyLen + 2)
if err != nil {
return nil, err
}
return b[:replyLen], nil
}
func readBytesReply(cn *pool.Conn) ([]byte, error) {
line, err := readLine(cn)
if err != nil {
return nil, err
}
switch line[0] {
case errorReply:
return nil, parseErrorReply(cn, line)
case stringReply:
return parseBytesReply(cn, line)
case statusReply:
return parseStatusReply(cn, line)
default:
return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
}
}
func readStringReply(cn *pool.Conn) (string, error) {
b, err := readBytesReply(cn)
if err != nil {
return "", err
}
return string(b), nil
}
func readFloatReply(cn *pool.Conn) (float64, error) {
b, err := readBytesReply(cn)
if err != nil {
return 0, err
}
return strconv.ParseFloat(bytesToString(b), 64)
}
func parseArrayHeader(cn *pool.Conn, line []byte) (int64, error) {
if isNilReply(line) {
return 0, Nil
}
n, err := strconv.ParseInt(bytesToString(line[1:]), 10, 64)
if err != nil {
return 0, err
}
return n, nil
}
func parseArrayReply(cn *pool.Conn, p multiBulkParser, line []byte) (interface{}, error) {
n, err := parseArrayHeader(cn, line)
if err != nil {
return nil, err
}
return p(cn, n)
}
func readArrayHeader(cn *pool.Conn) (int64, error) {
line, err := readLine(cn)
if err != nil {
return 0, err
}
switch line[0] {
case errorReply:
return 0, parseErrorReply(cn, line)
case arrayReply:
return parseArrayHeader(cn, line)
default:
return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
}
}
func readArrayReply(cn *pool.Conn, p multiBulkParser) (interface{}, error) {
line, err := readLine(cn)
if err != nil {
return nil, err
}
switch line[0] {
case errorReply:
return nil, parseErrorReply(cn, line)
case arrayReply:
return parseArrayReply(cn, p, line)
default:
return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
}
}
func readReply(cn *pool.Conn, p multiBulkParser) (interface{}, error) {
line, err := readLine(cn)
if err != nil {
return nil, err
}
switch line[0] {
case errorReply:
return nil, parseErrorReply(cn, line)
case statusReply:
return parseStatusReply(cn, line)
case intReply:
return parseIntReply(cn, line)
case stringReply:
return parseBytesReply(cn, line)
case arrayReply:
return parseArrayReply(cn, p, line)
}
return nil, fmt.Errorf("redis: can't parse %.100q", line)
}
func readScanReply(cn *pool.Conn) ([]string, uint64, error) {
n, err := readArrayHeader(cn)
if err != nil {
return nil, 0, err
}
if n != 2 {
return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
}
b, err := readBytesReply(cn)
if err != nil {
return nil, 0, err
}
cursor, err := strconv.ParseUint(bytesToString(b), 10, 64)
if err != nil {
return nil, 0, err
}
n, err = readArrayHeader(cn)
if err != nil {
return nil, 0, err
}
keys := make([]string, n)
for i := int64(0); i < n; i++ {
key, err := readStringReply(cn)
if err != nil {
return nil, 0, err
}
keys[i] = key
}
return keys, cursor, err
}
func sliceParser(cn *pool.Conn, n int64) (interface{}, error) {
// Implements proto.MultiBulkParse
func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
vals := make([]interface{}, 0, n)
for i := int64(0); i < n; i++ {
v, err := readReply(cn, sliceParser)
v, err := rd.ReadReply(sliceParser)
if err == Nil {
vals = append(vals, nil)
} else if err != nil {
@ -457,10 +29,11 @@ func sliceParser(cn *pool.Conn, n int64) (interface{}, error) {
return vals, nil
}
func intSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
// Implements proto.MultiBulkParse
func intSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
ints := make([]int64, 0, n)
for i := int64(0); i < n; i++ {
n, err := readIntReply(cn)
n, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
@ -469,10 +42,11 @@ func intSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
return ints, nil
}
func boolSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
// Implements proto.MultiBulkParse
func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
bools := make([]bool, 0, n)
for i := int64(0); i < n; i++ {
n, err := readIntReply(cn)
n, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
@ -481,10 +55,11 @@ func boolSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
return bools, nil
}
func stringSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
// Implements proto.MultiBulkParse
func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
ss := make([]string, 0, n)
for i := int64(0); i < n; i++ {
s, err := readStringReply(cn)
s, err := rd.ReadStringReply()
if err == Nil {
ss = append(ss, "")
} else if err != nil {
@ -496,10 +71,11 @@ func stringSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
return ss, nil
}
func floatSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
// Implements proto.MultiBulkParse
func floatSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
nn := make([]float64, 0, n)
for i := int64(0); i < n; i++ {
n, err := readFloatReply(cn)
n, err := rd.ReadFloatReply()
if err != nil {
return nil, err
}
@ -508,15 +84,16 @@ func floatSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
return nn, nil
}
func stringStringMapParser(cn *pool.Conn, n int64) (interface{}, error) {
// Implements proto.MultiBulkParse
func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
m := make(map[string]string, n/2)
for i := int64(0); i < n; i += 2 {
key, err := readStringReply(cn)
key, err := rd.ReadStringReply()
if err != nil {
return nil, err
}
value, err := readStringReply(cn)
value, err := rd.ReadStringReply()
if err != nil {
return nil, err
}
@ -526,15 +103,16 @@ func stringStringMapParser(cn *pool.Conn, n int64) (interface{}, error) {
return m, nil
}
func stringIntMapParser(cn *pool.Conn, n int64) (interface{}, error) {
// Implements proto.MultiBulkParse
func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
m := make(map[string]int64, n/2)
for i := int64(0); i < n; i += 2 {
key, err := readStringReply(cn)
key, err := rd.ReadStringReply()
if err != nil {
return nil, err
}
n, err := readIntReply(cn)
n, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
@ -544,19 +122,20 @@ func stringIntMapParser(cn *pool.Conn, n int64) (interface{}, error) {
return m, nil
}
func zSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
// Implements proto.MultiBulkParse
func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
zz := make([]Z, n/2)
for i := int64(0); i < n; i += 2 {
var err error
z := &zz[i/2]
z.Member, err = readStringReply(cn)
z.Member, err = rd.ReadStringReply()
if err != nil {
return nil, err
}
z.Score, err = readFloatReply(cn)
z.Score, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
@ -564,10 +143,11 @@ func zSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
return zz, nil
}
func clusterSlotsParser(cn *pool.Conn, slotNum int64) (interface{}, error) {
slots := make([]ClusterSlot, slotNum)
for slotInd := 0; slotInd < len(slots); slotInd++ {
n, err := readArrayHeader(cn)
// Implements proto.MultiBulkParse
func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
slots := make([]ClusterSlot, n)
for i := 0; i < len(slots); i++ {
n, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}
@ -576,19 +156,19 @@ func clusterSlotsParser(cn *pool.Conn, slotNum int64) (interface{}, error) {
return nil, err
}
start, err := readIntReply(cn)
start, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
end, err := readIntReply(cn)
end, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
nodes := make([]ClusterNode, n-2)
for nodeInd := 0; nodeInd < len(nodes); nodeInd++ {
n, err := readArrayHeader(cn)
for j := 0; j < len(nodes); j++ {
n, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}
@ -597,27 +177,27 @@ func clusterSlotsParser(cn *pool.Conn, slotNum int64) (interface{}, error) {
return nil, err
}
ip, err := readStringReply(cn)
ip, err := rd.ReadStringReply()
if err != nil {
return nil, err
}
port, err := readIntReply(cn)
port, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
nodes[nodeInd].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
nodes[j].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
if n == 3 {
id, err := readStringReply(cn)
id, err := rd.ReadStringReply()
if err != nil {
return nil, err
}
nodes[nodeInd].Id = id
nodes[j].Id = id
}
}
slots[slotInd] = ClusterSlot{
slots[i] = ClusterSlot{
Start: int(start),
End: int(end),
Nodes: nodes,
@ -626,29 +206,29 @@ func clusterSlotsParser(cn *pool.Conn, slotNum int64) (interface{}, error) {
return slots, nil
}
func newGeoLocationParser(q *GeoRadiusQuery) multiBulkParser {
return func(cn *pool.Conn, n int64) (interface{}, error) {
func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse {
return func(rd *proto.Reader, n int64) (interface{}, error) {
var loc GeoLocation
var err error
loc.Name, err = readStringReply(cn)
loc.Name, err = rd.ReadStringReply()
if err != nil {
return nil, err
}
if q.WithDist {
loc.Dist, err = readFloatReply(cn)
loc.Dist, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
}
if q.WithGeoHash {
loc.GeoHash, err = readIntReply(cn)
loc.GeoHash, err = rd.ReadIntReply()
if err != nil {
return nil, err
}
}
if q.WithCoord {
n, err := readArrayHeader(cn)
n, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}
@ -656,11 +236,11 @@ func newGeoLocationParser(q *GeoRadiusQuery) multiBulkParser {
return nil, fmt.Errorf("got %d coordinates, expected 2", n)
}
loc.Longitude, err = readFloatReply(cn)
loc.Longitude, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
loc.Latitude, err = readFloatReply(cn)
loc.Latitude, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
@ -670,11 +250,11 @@ func newGeoLocationParser(q *GeoRadiusQuery) multiBulkParser {
}
}
func newGeoLocationSliceParser(q *GeoRadiusQuery) multiBulkParser {
return func(cn *pool.Conn, n int64) (interface{}, error) {
func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse {
return func(rd *proto.Reader, n int64) (interface{}, error) {
locs := make([]GeoLocation, 0, n)
for i := int64(0); i < n; i++ {
v, err := readReply(cn, newGeoLocationParser(q))
v, err := rd.ReadReply(newGeoLocationParser(q))
if err != nil {
return nil, err
}
@ -693,7 +273,7 @@ func newGeoLocationSliceParser(q *GeoRadiusQuery) multiBulkParser {
}
}
func commandInfoParser(cn *pool.Conn, n int64) (interface{}, error) {
func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
var cmd CommandInfo
var err error
@ -701,36 +281,36 @@ func commandInfoParser(cn *pool.Conn, n int64) (interface{}, error) {
return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6")
}
cmd.Name, err = readStringReply(cn)
cmd.Name, err = rd.ReadStringReply()
if err != nil {
return nil, err
}
arity, err := readIntReply(cn)
arity, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
cmd.Arity = int8(arity)
flags, err := readReply(cn, stringSliceParser)
flags, err := rd.ReadReply(stringSliceParser)
if err != nil {
return nil, err
}
cmd.Flags = flags.([]string)
firstKeyPos, err := readIntReply(cn)
firstKeyPos, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
cmd.FirstKeyPos = int8(firstKeyPos)
lastKeyPos, err := readIntReply(cn)
lastKeyPos, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
cmd.LastKeyPos = int8(lastKeyPos)
stepCount, err := readIntReply(cn)
stepCount, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
@ -746,10 +326,10 @@ func commandInfoParser(cn *pool.Conn, n int64) (interface{}, error) {
return &cmd, nil
}
func commandInfoSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
m := make(map[string]*CommandInfo, n)
for i := int64(0); i < n; i++ {
v, err := readReply(cn, commandInfoParser)
v, err := rd.ReadReply(commandInfoParser)
if err != nil {
return nil, err
}