1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-29 17:41:15 +03:00

chore: cleanup key checker

This commit is contained in:
Vladimir Mihailenco
2022-12-07 10:38:28 +02:00
parent 3892986f01
commit 3dc5e40445
3 changed files with 118 additions and 71 deletions

View File

@ -4,8 +4,10 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v9"
"go.uber.org/zap"
)
func main() {
@ -15,117 +17,127 @@ func main() {
Addr: ":6379",
})
ch := make(chan string, 100)
var wg sync.WaitGroup
_ = rdb.Set(ctx, "key_with_ttl", "bar", time.Minute).Err()
_ = rdb.Set(ctx, "key_without_ttl_1", "", 0).Err()
_ = rdb.Set(ctx, "key_without_ttl_2", "", 0).Err()
wg.Add(1)
go func() {
defer wg.Done()
deleted, err := process(ctx, rdb, ch)
if err != nil {
panic(err)
}
fmt.Println("deleted", deleted, "keys")
}()
checker := NewKeyChecker(rdb, 100)
start := time.Now()
checker.Start(ctx)
iter := rdb.Scan(ctx, 0, "", 0).Iterator()
for iter.Next(ctx) {
ch <- iter.Val()
checker.Add(iter.Val())
}
if err := iter.Err(); err != nil {
panic(err)
}
close(ch)
wg.Wait()
deleted := checker.Stop()
fmt.Println("deleted", deleted, "keys", "in", time.Since(start))
}
func process(ctx context.Context, rdb *redis.Client, in <-chan string) (int, error) {
var wg sync.WaitGroup
type KeyChecker struct {
rdb *redis.Client
batchSize int
ch chan string
delCh chan string
wg sync.WaitGroup
deleted int
logger *zap.Logger
}
out := make(chan string, 100)
defer func() {
close(out)
wg.Wait()
}()
func NewKeyChecker(rdb *redis.Client, batchSize int) *KeyChecker {
return &KeyChecker{
rdb: rdb,
batchSize: batchSize,
ch: make(chan string, batchSize),
delCh: make(chan string, batchSize),
logger: zap.L(),
}
}
wg.Add(1)
func (c *KeyChecker) Add(key string) {
c.ch <- key
}
func (c *KeyChecker) Start(ctx context.Context) {
c.wg.Add(1)
go func() {
defer wg.Done()
if err := del(ctx, rdb, out); err != nil {
defer c.wg.Done()
if err := c.del(ctx); err != nil {
panic(err)
}
}()
var deleted int
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer close(c.delCh)
keys := make([]string, 0, 100)
for key := range in {
keys = append(keys, key)
if len(keys) < 100 {
continue
keys := make([]string, 0, c.batchSize)
for key := range c.ch {
keys = append(keys, key)
if len(keys) < cap(keys) {
continue
}
if err := c.checkKeys(ctx, keys); err != nil {
c.logger.Error("checkKeys failed", zap.Error(err))
}
keys = keys[:0]
}
var err error
keys, err = checkTTL(ctx, rdb, keys)
if err != nil {
return 0, err
if len(keys) > 0 {
if err := c.checkKeys(ctx, keys); err != nil {
c.logger.Error("checkKeys failed", zap.Error(err))
}
keys = nil
}
for _, key := range keys {
out <- key
}
deleted += len(keys)
keys = keys[:0]
}
if len(keys) > 0 {
keys, err := checkTTL(ctx, rdb, keys)
if err != nil {
return 0, err
}
for _, key := range keys {
out <- key
}
deleted += len(keys)
}
return deleted, nil
}()
}
func checkTTL(ctx context.Context, rdb *redis.Client, keys []string) ([]string, error) {
cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
func (c *KeyChecker) Stop() int {
close(c.ch)
c.wg.Wait()
return c.deleted
}
func (c *KeyChecker) checkKeys(ctx context.Context, keys []string) error {
cmds, err := c.rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for _, key := range keys {
pipe.TTL(ctx, key)
}
return nil
})
if err != nil {
return nil, err
return err
}
for i := len(cmds) - 1; i >= 0; i-- {
d, err := cmds[i].(*redis.DurationCmd).Result()
for i, cmd := range cmds {
d, err := cmd.(*redis.DurationCmd).Result()
if err != nil {
return nil, err
return err
}
if d != -1 {
keys = append(keys[:i], keys[i+1:]...)
if d == -1 {
c.delCh <- keys[i]
}
}
return keys, nil
return nil
}
func del(ctx context.Context, rdb *redis.Client, in <-chan string) error {
pipe := rdb.Pipeline()
func (c *KeyChecker) del(ctx context.Context) error {
pipe := c.rdb.Pipeline()
for key := range in {
for key := range c.delCh {
fmt.Printf("deleting %s...\n", key)
pipe.Del(ctx, key)
c.deleted++
if pipe.Len() < 100 {
if pipe.Len() < c.batchSize {
continue
}