mirror of
https://github.com/redis/go-redis.git
synced 2025-09-05 20:24:00 +03:00
update example and tests, drop connectionAdapter
This commit is contained in:
47
.github/copilot-instructions.md
vendored
47
.github/copilot-instructions.md
vendored
@@ -311,9 +311,50 @@ func (opt *Options) init() {
|
|||||||
|
|
||||||
```go
|
```go
|
||||||
type NotificationProcessor interface {
|
type NotificationProcessor interface {
|
||||||
ProcessPushNotification(ctx context.Context, data []byte) error
|
RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error
|
||||||
RegisterHandler(notificationType string, handler NotificationHandler) error
|
UnregisterHandler(pushNotificationName string) error
|
||||||
Close() error
|
GetHandler(pushNotificationName string) interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type NotificationHandler interface {
|
||||||
|
HandlePushNotification(ctx context.Context, handlerCtx NotificationHandlerContext, notification []interface{}) error
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Notification Hooks
|
||||||
|
|
||||||
|
```go
|
||||||
|
type NotificationHook interface {
|
||||||
|
PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)
|
||||||
|
PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotificationHandlerContext provides context for notification processing
|
||||||
|
type NotificationHandlerContext struct {
|
||||||
|
Client interface{} // Redis client instance
|
||||||
|
Pool interface{} // Connection pool
|
||||||
|
Conn interface{} // Specific connection (*pool.Conn)
|
||||||
|
IsBlocking bool // Whether notification was on blocking connection
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Hook Implementation Pattern
|
||||||
|
|
||||||
|
```go
|
||||||
|
func (h *CustomHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) {
|
||||||
|
// Access connection information
|
||||||
|
if conn, ok := notificationCtx.Conn.(*pool.Conn); ok {
|
||||||
|
connID := conn.GetID()
|
||||||
|
// Process with connection context
|
||||||
|
}
|
||||||
|
return notification, true // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *CustomHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) {
|
||||||
|
// Handle processing result
|
||||||
|
if result != nil {
|
||||||
|
// Log or handle error
|
||||||
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
38
adapters.go
38
adapters.go
@@ -7,7 +7,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9/internal/interfaces"
|
"github.com/redis/go-redis/v9/internal/interfaces"
|
||||||
"github.com/redis/go-redis/v9/internal/pool"
|
|
||||||
"github.com/redis/go-redis/v9/push"
|
"github.com/redis/go-redis/v9/push"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -88,43 +87,6 @@ func (oa *optionsAdapter) NewDialer() func(context.Context) (net.Conn, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// connectionAdapter adapts a Redis connection to interfaces.ConnectionWithRelaxedTimeout
|
|
||||||
type connectionAdapter struct {
|
|
||||||
conn *pool.Conn
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes the connection.
|
|
||||||
func (ca *connectionAdapter) Close() error {
|
|
||||||
return ca.conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsUsable returns true if the connection is safe to use for new commands.
|
|
||||||
func (ca *connectionAdapter) IsUsable() bool {
|
|
||||||
return ca.conn.IsUsable()
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPoolConn returns the underlying pool connection.
|
|
||||||
func (ca *connectionAdapter) GetPoolConn() *pool.Conn {
|
|
||||||
return ca.conn
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetRelaxedTimeout sets relaxed timeouts for this connection during hitless upgrades.
|
|
||||||
// These timeouts remain active until explicitly cleared.
|
|
||||||
func (ca *connectionAdapter) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration) {
|
|
||||||
ca.conn.SetRelaxedTimeout(readTimeout, writeTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline.
|
|
||||||
// After the deadline, timeouts automatically revert to normal values.
|
|
||||||
func (ca *connectionAdapter) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time) {
|
|
||||||
ca.conn.SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout, deadline)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClearRelaxedTimeout clears relaxed timeouts for this connection.
|
|
||||||
func (ca *connectionAdapter) ClearRelaxedTimeout() {
|
|
||||||
ca.conn.ClearRelaxedTimeout()
|
|
||||||
}
|
|
||||||
|
|
||||||
// pushProcessorAdapter adapts a push.NotificationProcessor to implement interfaces.NotificationProcessor.
|
// pushProcessorAdapter adapts a push.NotificationProcessor to implement interfaces.NotificationProcessor.
|
||||||
type pushProcessorAdapter struct {
|
type pushProcessorAdapter struct {
|
||||||
processor push.NotificationProcessor
|
processor push.NotificationProcessor
|
||||||
|
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
@@ -12,11 +13,15 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var ctx = context.Background()
|
var ctx = context.Background()
|
||||||
|
var cntErrors atomic.Int64
|
||||||
|
var cntSuccess atomic.Int64
|
||||||
|
var startTime = time.Now()
|
||||||
|
|
||||||
// This example is not supposed to be run as is. It is just a test to see how pubsub behaves in relation to pool management.
|
// This example is not supposed to be run as is. It is just a test to see how pubsub behaves in relation to pool management.
|
||||||
// It was used to find regressions in pool management in hitless mode.
|
// It was used to find regressions in pool management in hitless mode.
|
||||||
// Please don't use it as a reference for how to use pubsub.
|
// Please don't use it as a reference for how to use pubsub.
|
||||||
func main() {
|
func main() {
|
||||||
|
startTime = time.Now()
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
rdb := redis.NewClient(&redis.Options{
|
rdb := redis.NewClient(&redis.Options{
|
||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
@@ -25,6 +30,12 @@ func main() {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
_ = rdb.FlushDB(ctx).Err()
|
_ = rdb.FlushDB(ctx).Err()
|
||||||
|
hitlessManager := rdb.GetHitlessManager()
|
||||||
|
if hitlessManager == nil {
|
||||||
|
panic("hitless manager is nil")
|
||||||
|
}
|
||||||
|
loggingHook := hitless.NewLoggingHook(3)
|
||||||
|
hitlessManager.AddNotificationHook(loggingHook)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
@@ -62,7 +73,8 @@ func main() {
|
|||||||
subCtx, cancelSubCtx = context.WithCancel(ctx)
|
subCtx, cancelSubCtx = context.WithCancel(ctx)
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
if err := rdb.Incr(ctx, "publishers").Err(); err != nil {
|
if err := rdb.Incr(ctx, "publishers").Err(); err != nil {
|
||||||
panic(err)
|
fmt.Println("incr error:", err)
|
||||||
|
cntErrors.Add(1)
|
||||||
}
|
}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go floodThePool(pubCtx, rdb, wg)
|
go floodThePool(pubCtx, rdb, wg)
|
||||||
@@ -70,12 +82,14 @@ func main() {
|
|||||||
|
|
||||||
for i := 0; i < 500; i++ {
|
for i := 0; i < 500; i++ {
|
||||||
if err := rdb.Incr(ctx, "subscribers").Err(); err != nil {
|
if err := rdb.Incr(ctx, "subscribers").Err(); err != nil {
|
||||||
panic(err)
|
fmt.Println("incr error:", err)
|
||||||
|
cntErrors.Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go subscribe(subCtx, rdb, "test2", i, wg)
|
go subscribe(subCtx, rdb, "test2", i, wg)
|
||||||
}
|
}
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(120 * time.Second)
|
||||||
fmt.Println("canceling publishers")
|
fmt.Println("canceling publishers")
|
||||||
cancelPublishers()
|
cancelPublishers()
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
@@ -95,6 +109,9 @@ func main() {
|
|||||||
fmt.Printf("if drained = published*subscribers: %d\n", publishedInt*subscribersInt)
|
fmt.Printf("if drained = published*subscribers: %d\n", publishedInt*subscribersInt)
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
fmt.Println("errors:", cntErrors.Load())
|
||||||
|
fmt.Println("success:", cntSuccess.Load())
|
||||||
|
fmt.Println("time:", time.Since(startTime))
|
||||||
}
|
}
|
||||||
|
|
||||||
func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
|
func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
|
||||||
@@ -107,14 +124,18 @@ func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
|
|||||||
}
|
}
|
||||||
err := rdb.Publish(ctx, "test2", "hello").Err()
|
err := rdb.Publish(ctx, "test2", "hello").Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// noop
|
if err.Error() != "context canceled" {
|
||||||
//log.Println("publish error:", err)
|
log.Println("publish error:", err)
|
||||||
|
cntErrors.Add(1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = rdb.Incr(ctx, "published").Err()
|
err = rdb.Incr(ctx, "published").Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// noop
|
if err.Error() != "context canceled" {
|
||||||
//log.Println("incr error:", err)
|
log.Println("incr error:", err)
|
||||||
|
cntErrors.Add(1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
time.Sleep(10 * time.Nanosecond)
|
time.Sleep(10 * time.Nanosecond)
|
||||||
}
|
}
|
||||||
@@ -137,7 +158,10 @@ func subscribe(ctx context.Context, rdb *redis.Client, topic string, subscriberI
|
|||||||
case msg := <-recChan:
|
case msg := <-recChan:
|
||||||
err := rdb.Incr(ctx, "received").Err()
|
err := rdb.Incr(ctx, "received").Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("incr error:", err)
|
if err.Error() != "context canceled" {
|
||||||
|
log.Printf("%s\n", err.Error())
|
||||||
|
cntErrors.Add(1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_ = msg // Use the message to avoid unused variable warning
|
_ = msg // Use the message to avoid unused variable warning
|
||||||
}
|
}
|
||||||
|
@@ -49,23 +49,62 @@ Config: &hitless.Config{
|
|||||||
- **Auto-calculated**: `10 × MaxWorkers`, capped by pool size
|
- **Auto-calculated**: `10 × MaxWorkers`, capped by pool size
|
||||||
- **Always capped**: Queue size never exceeds pool size
|
- **Always capped**: Queue size never exceeds pool size
|
||||||
|
|
||||||
## Metrics Hook Example
|
## Notification Hooks
|
||||||
|
|
||||||
A metrics collection hook is available in `example_hooks.go` that demonstrates how to monitor hitless upgrade operations:
|
Notification hooks allow you to monitor and customize hitless upgrade operations. The `NotificationHook` interface provides pre and post processing hooks:
|
||||||
|
|
||||||
|
```go
|
||||||
|
type NotificationHook interface {
|
||||||
|
PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)
|
||||||
|
PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Example: Metrics Collection Hook
|
||||||
|
|
||||||
|
A metrics collection hook is available in `example_hooks.go`:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
import "github.com/redis/go-redis/v9/hitless"
|
import "github.com/redis/go-redis/v9/hitless"
|
||||||
|
|
||||||
metricsHook := hitless.NewMetricsHook()
|
metricsHook := hitless.NewMetricsHook()
|
||||||
// Use with your monitoring system
|
manager.AddNotificationHook(metricsHook)
|
||||||
|
|
||||||
|
// Access metrics
|
||||||
|
metrics := metricsHook.GetMetrics()
|
||||||
```
|
```
|
||||||
|
|
||||||
The metrics hook tracks:
|
### Example: Custom Logging Hook
|
||||||
|
|
||||||
|
```go
|
||||||
|
type CustomHook struct{}
|
||||||
|
|
||||||
|
func (h *CustomHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) {
|
||||||
|
// Log notification with connection details
|
||||||
|
if conn, ok := notificationCtx.Conn.(*pool.Conn); ok {
|
||||||
|
log.Printf("Processing %s on connection %d", notificationType, conn.GetID())
|
||||||
|
}
|
||||||
|
return notification, true // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *CustomHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) {
|
||||||
|
if result != nil {
|
||||||
|
log.Printf("Failed to process %s: %v", notificationType, result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The notification context provides access to:
|
||||||
|
- **Client**: The Redis client instance
|
||||||
|
- **Pool**: The connection pool
|
||||||
|
- **Conn**: The specific connection that received the notification
|
||||||
|
- **IsBlocking**: Whether the notification was received on a blocking connection
|
||||||
|
|
||||||
|
Hooks can track:
|
||||||
- Handoff success/failure rates
|
- Handoff success/failure rates
|
||||||
- Handoff duration
|
- Processing duration
|
||||||
- Queue depth
|
- Connection-specific metrics
|
||||||
- Worker utilization
|
- Custom business logic
|
||||||
- Connection lifecycle events
|
|
||||||
|
|
||||||
## Requirements
|
## Requirements
|
||||||
|
|
||||||
|
@@ -3,6 +3,10 @@ package hitless
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9/internal"
|
||||||
|
"github.com/redis/go-redis/v9/internal/pool"
|
||||||
|
"github.com/redis/go-redis/v9/push"
|
||||||
)
|
)
|
||||||
|
|
||||||
// contextKey is a custom type for context keys to avoid collisions
|
// contextKey is a custom type for context keys to avoid collisions
|
||||||
@@ -29,9 +33,14 @@ func NewMetricsHook() *MetricsHook {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PreHook records the start time for processing metrics.
|
// PreHook records the start time for processing metrics.
|
||||||
func (mh *MetricsHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
|
func (mh *MetricsHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) {
|
||||||
mh.NotificationCounts[notificationType]++
|
mh.NotificationCounts[notificationType]++
|
||||||
|
|
||||||
|
// Log connection information if available
|
||||||
|
if conn, ok := notificationCtx.Conn.(*pool.Conn); ok {
|
||||||
|
internal.Logger.Printf(ctx, "hitless: metrics hook processing %s notification on connection %d", notificationType, conn.GetID())
|
||||||
|
}
|
||||||
|
|
||||||
// Store start time in context for duration calculation
|
// Store start time in context for duration calculation
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
_ = context.WithValue(ctx, startTimeKey, startTime) // Context not used further
|
_ = context.WithValue(ctx, startTimeKey, startTime) // Context not used further
|
||||||
@@ -40,7 +49,7 @@ func (mh *MetricsHook) PreHook(ctx context.Context, notificationType string, not
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PostHook records processing completion and any errors.
|
// PostHook records processing completion and any errors.
|
||||||
func (mh *MetricsHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
|
func (mh *MetricsHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) {
|
||||||
// Calculate processing duration
|
// Calculate processing duration
|
||||||
if startTime, ok := ctx.Value(startTimeKey).(time.Time); ok {
|
if startTime, ok := ctx.Value(startTimeKey).(time.Time); ok {
|
||||||
duration := time.Since(startTime)
|
duration := time.Since(startTime)
|
||||||
@@ -50,6 +59,11 @@ func (mh *MetricsHook) PostHook(ctx context.Context, notificationType string, no
|
|||||||
// Record errors
|
// Record errors
|
||||||
if result != nil {
|
if result != nil {
|
||||||
mh.ErrorCounts[notificationType]++
|
mh.ErrorCounts[notificationType]++
|
||||||
|
|
||||||
|
// Log error details with connection information
|
||||||
|
if conn, ok := notificationCtx.Conn.(*pool.Conn); ok {
|
||||||
|
internal.Logger.Printf(ctx, "hitless: metrics hook recorded error for %s notification on connection %d: %v", notificationType, conn.GetID(), result)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/redis/go-redis/v9/internal"
|
"github.com/redis/go-redis/v9/internal"
|
||||||
"github.com/redis/go-redis/v9/internal/interfaces"
|
"github.com/redis/go-redis/v9/internal/interfaces"
|
||||||
"github.com/redis/go-redis/v9/internal/pool"
|
"github.com/redis/go-redis/v9/internal/pool"
|
||||||
|
"github.com/redis/go-redis/v9/push"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Push notification type constants for hitless upgrades
|
// Push notification type constants for hitless upgrades
|
||||||
@@ -35,8 +36,8 @@ var hitlessNotificationTypes = []string{
|
|||||||
// PreHook can modify the notification and return false to skip processing
|
// PreHook can modify the notification and return false to skip processing
|
||||||
// PostHook is called after successful processing
|
// PostHook is called after successful processing
|
||||||
type NotificationHook interface {
|
type NotificationHook interface {
|
||||||
PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool)
|
PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)
|
||||||
PostHook(ctx context.Context, notificationType string, notification []interface{}, result error)
|
PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MovingOperationKey provides a unique key for tracking MOVING operations
|
// MovingOperationKey provides a unique key for tracking MOVING operations
|
||||||
@@ -252,14 +253,14 @@ func (hm *HitlessManager) GetState() State {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// processPreHooks calls all pre-hooks and returns the modified notification and whether to continue processing.
|
// processPreHooks calls all pre-hooks and returns the modified notification and whether to continue processing.
|
||||||
func (hm *HitlessManager) processPreHooks(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
|
func (hm *HitlessManager) processPreHooks(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) {
|
||||||
hm.hooksMu.RLock()
|
hm.hooksMu.RLock()
|
||||||
defer hm.hooksMu.RUnlock()
|
defer hm.hooksMu.RUnlock()
|
||||||
|
|
||||||
currentNotification := notification
|
currentNotification := notification
|
||||||
|
|
||||||
for _, hook := range hm.hooks {
|
for _, hook := range hm.hooks {
|
||||||
modifiedNotification, shouldContinue := hook.PreHook(ctx, notificationType, currentNotification)
|
modifiedNotification, shouldContinue := hook.PreHook(ctx, notificationCtx, notificationType, currentNotification)
|
||||||
if !shouldContinue {
|
if !shouldContinue {
|
||||||
return modifiedNotification, false
|
return modifiedNotification, false
|
||||||
}
|
}
|
||||||
@@ -270,12 +271,12 @@ func (hm *HitlessManager) processPreHooks(ctx context.Context, notificationType
|
|||||||
}
|
}
|
||||||
|
|
||||||
// processPostHooks calls all post-hooks with the processing result.
|
// processPostHooks calls all post-hooks with the processing result.
|
||||||
func (hm *HitlessManager) processPostHooks(ctx context.Context, notificationType string, notification []interface{}, result error) {
|
func (hm *HitlessManager) processPostHooks(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) {
|
||||||
hm.hooksMu.RLock()
|
hm.hooksMu.RLock()
|
||||||
defer hm.hooksMu.RUnlock()
|
defer hm.hooksMu.RUnlock()
|
||||||
|
|
||||||
for _, hook := range hm.hooks {
|
for _, hook := range hm.hooks {
|
||||||
hook.PostHook(ctx, notificationType, notification, result)
|
hook.PostHook(ctx, notificationCtx, notificationType, notification, result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -4,6 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9/internal"
|
"github.com/redis/go-redis/v9/internal"
|
||||||
|
"github.com/redis/go-redis/v9/internal/pool"
|
||||||
|
"github.com/redis/go-redis/v9/push"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LoggingHook is an example hook implementation that logs all notifications.
|
// LoggingHook is an example hook implementation that logs all notifications.
|
||||||
@@ -12,19 +14,28 @@ type LoggingHook struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PreHook logs the notification before processing and allows modification.
|
// PreHook logs the notification before processing and allows modification.
|
||||||
func (lh *LoggingHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
|
func (lh *LoggingHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) {
|
||||||
if lh.LogLevel >= 2 { // Info level
|
if lh.LogLevel >= 2 { // Info level
|
||||||
internal.Logger.Printf(ctx, "hitless: processing %s notification: %v", notificationType, notification)
|
// Log the notification type and content
|
||||||
|
connID := uint64(0)
|
||||||
|
if conn, ok := notificationCtx.Conn.(*pool.Conn); ok {
|
||||||
|
connID = conn.GetID()
|
||||||
|
}
|
||||||
|
internal.Logger.Printf(ctx, "hitless: conn[%d] processing %s notification: %v", connID, notificationType, notification)
|
||||||
}
|
}
|
||||||
return notification, true // Continue processing with unmodified notification
|
return notification, true // Continue processing with unmodified notification
|
||||||
}
|
}
|
||||||
|
|
||||||
// PostHook logs the result after processing.
|
// PostHook logs the result after processing.
|
||||||
func (lh *LoggingHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
|
func (lh *LoggingHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) {
|
||||||
|
connID := uint64(0)
|
||||||
|
if conn, ok := notificationCtx.Conn.(*pool.Conn); ok {
|
||||||
|
connID = conn.GetID()
|
||||||
|
}
|
||||||
if result != nil && lh.LogLevel >= 1 { // Warning level
|
if result != nil && lh.LogLevel >= 1 { // Warning level
|
||||||
internal.Logger.Printf(ctx, "hitless: %s notification processing failed: %v - %v", notificationType, result, notification)
|
internal.Logger.Printf(ctx, "hitless: conn[%d] %s notification processing failed: %v - %v", connID, notificationType, result, notification)
|
||||||
} else if lh.LogLevel >= 3 { // Debug level
|
} else if lh.LogLevel >= 3 { // Debug level
|
||||||
internal.Logger.Printf(ctx, "hitless: %s notification processed successfully", notificationType)
|
internal.Logger.Printf(ctx, "hitless: conn[%d] %s notification processed successfully", connID, notificationType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -30,7 +30,7 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Process pre-hooks - they can modify the notification or skip processing
|
// Process pre-hooks - they can modify the notification or skip processing
|
||||||
modifiedNotification, shouldContinue := snh.manager.processPreHooks(ctx, notificationType, notification)
|
modifiedNotification, shouldContinue := snh.manager.processPreHooks(ctx, handlerCtx, notificationType, notification)
|
||||||
if !shouldContinue {
|
if !shouldContinue {
|
||||||
return nil // Hooks decided to skip processing
|
return nil // Hooks decided to skip processing
|
||||||
}
|
}
|
||||||
@@ -53,7 +53,7 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Process post-hooks with the result
|
// Process post-hooks with the result
|
||||||
snh.manager.processPostHooks(ctx, notificationType, modifiedNotification, err)
|
snh.manager.processPostHooks(ctx, handlerCtx, notificationType, modifiedNotification, err)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -97,9 +97,7 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus
|
|||||||
|
|
||||||
// Type assert to get the underlying pool connection
|
// Type assert to get the underlying pool connection
|
||||||
var poolConn *pool.Conn
|
var poolConn *pool.Conn
|
||||||
if connAdapter, ok := conn.(interface{ GetPoolConn() *pool.Conn }); ok {
|
if pc, ok := conn.(*pool.Conn); ok {
|
||||||
poolConn = connAdapter.GetPoolConn()
|
|
||||||
} else if pc, ok := conn.(*pool.Conn); ok {
|
|
||||||
poolConn = pc
|
poolConn = pc
|
||||||
} else {
|
} else {
|
||||||
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MOVING notification - %T %#v", conn, handlerCtx)
|
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MOVING notification - %T %#v", conn, handlerCtx)
|
||||||
@@ -157,13 +155,11 @@ func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx
|
|||||||
return ErrInvalidNotification
|
return ErrInvalidNotification
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the connection from handler context and type assert to connectionAdapter
|
|
||||||
if handlerCtx.Conn == nil {
|
if handlerCtx.Conn == nil {
|
||||||
internal.Logger.Printf(ctx, "hitless: no connection in handler context for MIGRATING notification")
|
internal.Logger.Printf(ctx, "hitless: no connection in handler context for MIGRATING notification")
|
||||||
return ErrInvalidNotification
|
return ErrInvalidNotification
|
||||||
}
|
}
|
||||||
|
|
||||||
// Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout
|
|
||||||
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
||||||
if !ok {
|
if !ok {
|
||||||
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATING notification")
|
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATING notification")
|
||||||
@@ -184,13 +180,11 @@ func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx p
|
|||||||
return ErrInvalidNotification
|
return ErrInvalidNotification
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the connection from handler context and type assert to connectionAdapter
|
|
||||||
if handlerCtx.Conn == nil {
|
if handlerCtx.Conn == nil {
|
||||||
internal.Logger.Printf(ctx, "hitless: no connection in handler context for MIGRATED notification")
|
internal.Logger.Printf(ctx, "hitless: no connection in handler context for MIGRATED notification")
|
||||||
return ErrInvalidNotification
|
return ErrInvalidNotification
|
||||||
}
|
}
|
||||||
|
|
||||||
// Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout
|
|
||||||
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
||||||
if !ok {
|
if !ok {
|
||||||
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATED notification")
|
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATED notification")
|
||||||
@@ -211,13 +205,11 @@ func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCt
|
|||||||
return ErrInvalidNotification
|
return ErrInvalidNotification
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the connection from handler context and type assert to connectionAdapter
|
|
||||||
if handlerCtx.Conn == nil {
|
if handlerCtx.Conn == nil {
|
||||||
internal.Logger.Printf(ctx, "hitless: no connection in handler context for FAILING_OVER notification")
|
internal.Logger.Printf(ctx, "hitless: no connection in handler context for FAILING_OVER notification")
|
||||||
return ErrInvalidNotification
|
return ErrInvalidNotification
|
||||||
}
|
}
|
||||||
|
|
||||||
// Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout
|
|
||||||
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
||||||
if !ok {
|
if !ok {
|
||||||
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILING_OVER notification")
|
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILING_OVER notification")
|
||||||
@@ -238,13 +230,11 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx
|
|||||||
return ErrInvalidNotification
|
return ErrInvalidNotification
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the connection from handler context and type assert to connectionAdapter
|
|
||||||
if handlerCtx.Conn == nil {
|
if handlerCtx.Conn == nil {
|
||||||
internal.Logger.Printf(ctx, "hitless: no connection in handler context for FAILED_OVER notification")
|
internal.Logger.Printf(ctx, "hitless: no connection in handler context for FAILED_OVER notification")
|
||||||
return ErrInvalidNotification
|
return ErrInvalidNotification
|
||||||
}
|
}
|
||||||
|
|
||||||
// Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout
|
|
||||||
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
||||||
if !ok {
|
if !ok {
|
||||||
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILED_OVER notification")
|
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILED_OVER notification")
|
||||||
|
@@ -117,7 +117,7 @@ func (ph *PoolHook) IsHandoffPending(conn *pool.Conn) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// OnGet is called when a connection is retrieved from the pool
|
// OnGet is called when a connection is retrieved from the pool
|
||||||
func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, isNewConn bool) error {
|
func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, _ bool) error {
|
||||||
// NOTE: There are two conditions to make sure we don't return a connection that should be handed off or is
|
// NOTE: There are two conditions to make sure we don't return a connection that should be handed off or is
|
||||||
// in a handoff state at the moment.
|
// in a handoff state at the moment.
|
||||||
|
|
||||||
@@ -234,6 +234,7 @@ func (ph *PoolHook) onDemandWorker() {
|
|||||||
func (ph *PoolHook) processHandoffRequest(request HandoffRequest) {
|
func (ph *PoolHook) processHandoffRequest(request HandoffRequest) {
|
||||||
// Remove from pending map
|
// Remove from pending map
|
||||||
defer ph.pending.Delete(request.Conn.GetID())
|
defer ph.pending.Delete(request.Conn.GetID())
|
||||||
|
internal.Logger.Printf(context.Background(), "hitless: conn[%d] Processing handoff request start", request.Conn.GetID())
|
||||||
|
|
||||||
// Create a context with handoff timeout from config
|
// Create a context with handoff timeout from config
|
||||||
handoffTimeout := 30 * time.Second // Default fallback
|
handoffTimeout := 30 * time.Second // Default fallback
|
||||||
@@ -366,6 +367,7 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
|
|||||||
}
|
}
|
||||||
|
|
||||||
retries := conn.IncrementAndGetHandoffRetries(1)
|
retries := conn.IncrementAndGetHandoffRetries(1)
|
||||||
|
internal.Logger.Printf(ctx, "hitless: conn[%d] Retry %d: Performing handoff to %s(was %s)", conn.GetID(), retries, newEndpoint, conn.RemoteAddr().String())
|
||||||
maxRetries := 3 // Default fallback
|
maxRetries := 3 // Default fallback
|
||||||
if ph.config != nil {
|
if ph.config != nil {
|
||||||
maxRetries = ph.config.MaxHandoffRetries
|
maxRetries = ph.config.MaxHandoffRetries
|
||||||
@@ -387,6 +389,7 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
|
|||||||
// Create new connection to the new endpoint
|
// Create new connection to the new endpoint
|
||||||
newNetConn, err := endpointDialer(ctx)
|
newNetConn, err := endpointDialer(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
internal.Logger.Printf(ctx, "hitless: conn[%d] Failed to dial new endpoint %s: %v", conn.GetID(), newEndpoint, err)
|
||||||
// hitless: will retry
|
// hitless: will retry
|
||||||
// Maybe a network error - retry after a delay
|
// Maybe a network error - retry after a delay
|
||||||
return true, err
|
return true, err
|
||||||
@@ -409,6 +412,7 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
conn.ClearHandoffState()
|
conn.ClearHandoffState()
|
||||||
|
internal.Logger.Printf(ctx, "hitless: conn[%d] Handoff to %s successful", conn.GetID(), newEndpoint)
|
||||||
|
|
||||||
// Apply relaxed timeout to the new connection for the configured post-handoff duration
|
// Apply relaxed timeout to the new connection for the configured post-handoff duration
|
||||||
// This gives the new connection more time to handle operations during cluster transition
|
// This gives the new connection more time to handle operations during cluster transition
|
||||||
|
@@ -5,6 +5,8 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PubSubStats struct {
|
type PubSubStats struct {
|
||||||
@@ -52,6 +54,9 @@ func (p *PubSubPool) TrackConn(cn *Conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *PubSubPool) UntrackConn(cn *Conn) {
|
func (p *PubSubPool) UntrackConn(cn *Conn) {
|
||||||
|
if !cn.IsUsable() || cn.ShouldHandoff() {
|
||||||
|
internal.Logger.Printf(context.Background(), "pubsub: untracking connection %d [usable, handoff] = [%v, %v]", cn.GetID(), cn.IsUsable(), cn.ShouldHandoff())
|
||||||
|
}
|
||||||
atomic.AddUint32(&p.stats.Active, ^uint32(0))
|
atomic.AddUint32(&p.stats.Active, ^uint32(0))
|
||||||
atomic.AddUint32(&p.stats.Untracked, 1)
|
atomic.AddUint32(&p.stats.Untracked, 1)
|
||||||
p.activeConns.Delete(cn.GetID())
|
p.activeConns.Delete(cn.GetID())
|
||||||
|
11
pubsub.go
11
pubsub.go
@@ -170,10 +170,16 @@ func (c *PubSub) releaseConn(ctx context.Context, cn *pool.Conn, err error, allo
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !cn.IsUsable() || cn.ShouldHandoff() {
|
if !cn.IsUsable() || cn.ShouldHandoff() {
|
||||||
|
if cn.ShouldHandoff() {
|
||||||
|
internal.Logger.Printf(ctx, "pubsub: connection[%d] is marked for handoff, reconnecting", cn.GetID())
|
||||||
|
} else {
|
||||||
|
internal.Logger.Printf(ctx, "pubsub: connection[%d] is not usable, reconnecting", cn.GetID())
|
||||||
|
}
|
||||||
c.reconnect(ctx, fmt.Errorf("pubsub: connection is not usable"))
|
c.reconnect(ctx, fmt.Errorf("pubsub: connection is not usable"))
|
||||||
}
|
}
|
||||||
|
|
||||||
if isBadConn(err, allowTimeout, c.opt.Addr) {
|
if isBadConn(err, allowTimeout, c.opt.Addr) {
|
||||||
|
internal.Logger.Printf(ctx, "pubsub: releasing connection[%d]: %v", cn.GetID(), err)
|
||||||
c.reconnect(ctx, err)
|
c.reconnect(ctx, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -187,7 +193,10 @@ func (c *PubSub) reconnect(ctx context.Context, reason error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if newEndpoint != "" {
|
if newEndpoint != "" {
|
||||||
|
// Update the address in the options
|
||||||
|
oldAddr := c.cn.RemoteAddr().String()
|
||||||
c.opt.Addr = newEndpoint
|
c.opt.Addr = newEndpoint
|
||||||
|
internal.Logger.Printf(ctx, "pubsub: reconnecting to new endpoint %s (was %s)", newEndpoint, oldAddr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = c.closeTheCn(reason)
|
_ = c.closeTheCn(reason)
|
||||||
@@ -199,7 +208,7 @@ func (c *PubSub) closeTheCn(reason error) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if !c.closed {
|
if !c.closed {
|
||||||
internal.Logger.Printf(c.getContext(), "redis: discarding bad PubSub connection: %s", reason)
|
internal.Logger.Printf(c.getContext(), "redis: discarding bad PubSub connection[%d]: %s, %v", c.cn.GetID(), reason, c.cn.RemoteAddr())
|
||||||
}
|
}
|
||||||
err := c.closeConn(c.cn)
|
err := c.closeConn(c.cn)
|
||||||
c.cn = nil
|
c.cn = nil
|
||||||
|
@@ -37,7 +37,6 @@ type NotificationHandlerContext struct {
|
|||||||
// circular dependencies. The developer is responsible for type assertion.
|
// circular dependencies. The developer is responsible for type assertion.
|
||||||
// It can be one of the following types:
|
// It can be one of the following types:
|
||||||
// - *pool.Conn
|
// - *pool.Conn
|
||||||
// - *connectionAdapter (for hitless upgrades)
|
|
||||||
Conn interface{}
|
Conn interface{}
|
||||||
|
|
||||||
// IsBlocking indicates if the notification was received on a blocking connection.
|
// IsBlocking indicates if the notification was received on a blocking connection.
|
||||||
|
3
redis.go
3
redis.go
@@ -1077,6 +1077,7 @@ func (c *Client) pubSub() *PubSub {
|
|||||||
// will return nil if already initialized
|
// will return nil if already initialized
|
||||||
err = c.initConn(ctx, cn)
|
err = c.initConn(ctx, cn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
internal.Logger.Printf(ctx, "pubsub: conn[%d] to ADDR %s [usable, handoff] = [%v, %v] after initConn returned %v", cn.GetID(), addr, cn.IsUsable(), cn.ShouldHandoff(), err)
|
||||||
_ = cn.Close()
|
_ = cn.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -1277,7 +1278,7 @@ func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) push.Notifica
|
|||||||
return push.NotificationHandlerContext{
|
return push.NotificationHandlerContext{
|
||||||
Client: c,
|
Client: c,
|
||||||
ConnPool: c.connPool,
|
ConnPool: c.connPool,
|
||||||
Conn: &connectionAdapter{conn: cn}, // Wrap in adapter for easier interface access
|
Conn: cn, // Wrap in adapter for easier interface access
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user