1
0
mirror of https://github.com/redis/node-redis.git synced 2025-12-17 11:52:32 +03:00
Files
node-redis/packages/client/lib/tests/test-scenario/sharded-pubsub/utils/command-runner.ts
Nikolay Karadzhov 96d6445d66 fix(ssubscribe): properly resubscribe in case of shard failover (#3098)
* fix(ssubscribe): properly resubscribe in case of shard failover

1) when RE failover happens, there is a disconnect
2) affected Client reconnects and tries to resubscribe all existing listeners
ISSUE #1: CROSSSLOT Error - client was doing ssubscribe ch1 ch2.. chN which, after the failover could result in CROSSSLOT ( naturally, becasuse now some slots could be owned by other shards )
FIX: send one ssubscribe command per channel instead of one ssubscribe for all channels
ISSUE #2: MOVED Error - some/all of the channels might be moved somewhere else
FIX: 1: Propagate the error to the Cluster. 2: Cluster rediscovers topology.
3: Cluster resubscribes all listeners of the failed client ( possibly some/all of those will end up in a different client after the rediscovery ) 

fixes: #2902
2025-10-13 11:59:08 +03:00

91 lines
2.9 KiB
TypeScript

import type { MessageTracker } from "./message-tracker";
import { Cluster } from "./test.util";
import { setTimeout } from "timers/promises";
/**
* Options for the `publishMessagesUntilAbortSignal` method
*/
interface PublishMessagesUntilAbortSignalOptions {
/**
* Number of messages to publish in each batch
*/
batchSize: number;
/**
* Timeout between batches in milliseconds
*/
timeoutMs: number;
/**
* Function that generates the message content to be published
*/
createMessage: () => string;
}
/**
* Utility class for running test commands until a stop signal is received
*/
export class TestCommandRunner {
private static readonly defaultPublishOptions: PublishMessagesUntilAbortSignalOptions =
{
batchSize: 10,
timeoutMs: 10,
createMessage: () => Date.now().toString(),
};
/**
* Continuously publishes messages to the given Redis channels until aborted.
*
* @param {Redis|Cluster} client - Redis client or cluster instance used to publish messages.
* @param {string[]} channels - List of channel names to publish messages to.
* @param {MessageTracker} messageTracker - Tracks sent and failed message counts per channel.
* @param {Partial<PublishMessagesUntilAbortSignalOptions>} [options] - Optional overrides for batch size, timeout, and message factory.
* @param {AbortController} [externalAbortController] - Optional external abort controller to control publishing lifecycle.
* @returns {{ controller: AbortController, result: Promise<void> }}
* An object containing the abort controller and a promise that resolves when publishing stops.
*/
static publishMessagesUntilAbortSignal(
client: Cluster,
channels: string[],
messageTracker: MessageTracker,
options?: Partial<PublishMessagesUntilAbortSignalOptions>,
externalAbortController?: AbortController,
) {
const publishOptions = {
...TestCommandRunner.defaultPublishOptions,
...options,
};
const abortController = externalAbortController ?? new AbortController();
const result = async () => {
while (!abortController.signal.aborted) {
const batchPromises: Promise<void>[] = [];
for (let i = 0; i < publishOptions.batchSize; i++) {
for (const channel of channels) {
const message = publishOptions.createMessage();
const publishPromise = client
.sPublish(channel, message)
.then(() => {
messageTracker.incrementSent(channel);
})
.catch(() => {
messageTracker.incrementFailed(channel);
});
batchPromises.push(publishPromise);
}
}
await Promise.all(batchPromises);
await setTimeout(publishOptions.timeoutMs);
}
};
return {
controller: abortController,
result: result(),
};
}
}