mirror of
https://github.com/redis/go-redis.git
synced 2025-07-29 17:41:15 +03:00
Fix race
This commit is contained in:
42
redis.go
42
redis.go
@ -49,12 +49,10 @@ func (hs hooks) process(
|
||||
ctx context.Context, cmd Cmder, fn func(context.Context, Cmder) error,
|
||||
) error {
|
||||
if len(hs.hooks) == 0 {
|
||||
err, canceled := hs.withContext(ctx, func() error {
|
||||
err := hs.withContext(ctx, func() error {
|
||||
return fn(ctx, cmd)
|
||||
})
|
||||
if canceled {
|
||||
cmd.SetErr(err)
|
||||
}
|
||||
cmd.SetErr(err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -69,13 +67,10 @@ func (hs hooks) process(
|
||||
}
|
||||
|
||||
if retErr == nil {
|
||||
var canceled bool
|
||||
retErr, canceled = hs.withContext(ctx, func() error {
|
||||
retErr = hs.withContext(ctx, func() error {
|
||||
return fn(ctx, cmd)
|
||||
})
|
||||
if canceled {
|
||||
cmd.SetErr(retErr)
|
||||
}
|
||||
cmd.SetErr(retErr)
|
||||
}
|
||||
|
||||
for hookIndex--; hookIndex >= 0; hookIndex-- {
|
||||
@ -92,12 +87,9 @@ func (hs hooks) processPipeline(
|
||||
ctx context.Context, cmds []Cmder, fn func(context.Context, []Cmder) error,
|
||||
) error {
|
||||
if len(hs.hooks) == 0 {
|
||||
err, canceled := hs.withContext(ctx, func() error {
|
||||
err := hs.withContext(ctx, func() error {
|
||||
return fn(ctx, cmds)
|
||||
})
|
||||
if canceled {
|
||||
setCmdsErr(cmds, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -112,13 +104,9 @@ func (hs hooks) processPipeline(
|
||||
}
|
||||
|
||||
if retErr == nil {
|
||||
var canceled bool
|
||||
retErr, canceled = hs.withContext(ctx, func() error {
|
||||
retErr = hs.withContext(ctx, func() error {
|
||||
return fn(ctx, cmds)
|
||||
})
|
||||
if canceled {
|
||||
setCmdsErr(cmds, retErr)
|
||||
}
|
||||
}
|
||||
|
||||
for hookIndex--; hookIndex >= 0; hookIndex-- {
|
||||
@ -138,10 +126,10 @@ func (hs hooks) processTxPipeline(
|
||||
return hs.processPipeline(ctx, cmds, fn)
|
||||
}
|
||||
|
||||
func (hs hooks) withContext(ctx context.Context, fn func() error) (_ error, canceled bool) {
|
||||
func (hs hooks) withContext(ctx context.Context, fn func() error) error {
|
||||
done := ctx.Done()
|
||||
if done == nil {
|
||||
return fn(), false
|
||||
return fn()
|
||||
}
|
||||
|
||||
errc := make(chan error, 1)
|
||||
@ -149,9 +137,9 @@ func (hs hooks) withContext(ctx context.Context, fn func() error) (_ error, canc
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return ctx.Err(), true
|
||||
return ctx.Err()
|
||||
case err := <-errc:
|
||||
return err, false
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@ -324,15 +312,6 @@ func (c *baseClient) withConn(
|
||||
}
|
||||
|
||||
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
|
||||
err := c._process(ctx, cmd)
|
||||
if err != nil {
|
||||
cmd.SetErr(err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *baseClient) _process(ctx context.Context, cmd Cmder) error {
|
||||
var lastErr error
|
||||
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
|
||||
attempt := attempt
|
||||
@ -476,6 +455,7 @@ func (c *baseClient) pipelineProcessCmds(
|
||||
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
|
||||
for _, cmd := range cmds {
|
||||
err := cmd.readReply(rd)
|
||||
cmd.SetErr(err)
|
||||
if err != nil && !isRedisError(err) {
|
||||
return err
|
||||
}
|
||||
|
Reference in New Issue
Block a user