1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-29 17:41:15 +03:00

Add pipelining support.

This commit is contained in:
Vladimir Mihailenco
2012-07-29 12:42:00 +03:00
parent e02eaf485f
commit 41137c2e6f
5 changed files with 310 additions and 132 deletions

View File

@ -30,6 +30,8 @@ func NewClient(connect connectFunc, disconnect disconnectFunc) *Client {
readerPool: bufreader.NewReaderPool(100, createReader),
connect: connect,
disconnect: disconnect,
reqs: make([]Req, 0),
}
}
@ -99,14 +101,14 @@ func (c *Client) WriteRead(buf []byte, rd *bufreader.Reader) error {
return c.ReadReply(rd)
}
func (c *Client) Run(req Req) {
if c.reqs != nil {
c.mtx.Lock()
c.reqs = append(c.reqs, req)
c.mtx.Unlock()
return
}
func (c *Client) Queue(req Req) {
req.SetClient(c)
c.mtx.Lock()
c.reqs = append(c.reqs, req)
c.mtx.Unlock()
}
func (c *Client) Run(req Req) {
rd, err := c.readerPool.Get()
if err != nil {
req.SetErr(err)
@ -128,23 +130,53 @@ func (c *Client) Run(req Req) {
req.SetVal(val)
}
func (c *Client) RunQueued() ([]Req, error) {
c.mtx.Lock()
reqs := c.reqs
c.reqs = make([]Req, 0)
c.mtx.Unlock()
var multiReq []byte
if len(reqs) == 1 {
multiReq = reqs[0].Req()
} else {
multiReq = make([]byte, 0, 1024)
for _, req := range reqs {
multiReq = append(multiReq, req.Req()...)
}
}
rd, err := c.readerPool.Get()
if err != nil {
return nil, err
}
defer c.readerPool.Add(rd)
err = c.WriteRead(multiReq, rd)
if err != nil {
return nil, err
}
for _, req := range reqs {
val, err := req.ParseReply(rd)
if err != nil {
req.SetErr(err)
} else {
req.SetVal(val)
}
}
return reqs, nil
}
//------------------------------------------------------------------------------
func (c *Client) Discard() {
if c.reqs == nil {
panic("MultiClient required")
}
c.mtx.Lock()
c.reqs = c.reqs[:0]
c.mtx.Unlock()
}
func (c *Client) Exec() ([]Req, error) {
if c.reqs == nil {
panic("MultiClient required")
}
c.mtx.Lock()
reqs := c.reqs
c.reqs = make([]Req, 0)
@ -170,20 +202,21 @@ func (c *Client) Exec() ([]Req, error) {
statusReq := NewStatusReq()
// multi
// Parse MULTI command reply.
_, err = statusReq.ParseReply(rd)
if err != nil {
return nil, err
}
// Parse queued replies.
for _ = range reqs {
// queue
_, err = statusReq.ParseReply(rd)
if err != nil {
return nil, err
}
}
// Parse number of replies.
line, err := rd.ReadLine('\n')
if err != nil {
return nil, err
@ -192,12 +225,14 @@ func (c *Client) Exec() ([]Req, error) {
return nil, fmt.Errorf("Expected '*', but got line %q of %q.", line, rd.Bytes())
}
// Parse replies.
for _, req := range reqs {
val, err := req.ParseReply(rd)
if err != nil {
req.SetErr(err)
} else {
req.SetVal(val)
}
req.SetVal(val)
}
return reqs, nil