You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-12-17 11:52:32 +03:00
* fix(ssubscribe): properly resubscribe in case of shard failover 1) when RE failover happens, there is a disconnect 2) affected Client reconnects and tries to resubscribe all existing listeners ISSUE #1: CROSSSLOT Error - client was doing ssubscribe ch1 ch2.. chN which, after the failover could result in CROSSSLOT ( naturally, becasuse now some slots could be owned by other shards ) FIX: send one ssubscribe command per channel instead of one ssubscribe for all channels ISSUE #2: MOVED Error - some/all of the channels might be moved somewhere else FIX: 1: Propagate the error to the Cluster. 2: Cluster rediscovers topology. 3: Cluster resubscribes all listeners of the failed client ( possibly some/all of those will end up in a different client after the rediscovery ) fixes: #2902
91 lines
2.9 KiB
TypeScript
91 lines
2.9 KiB
TypeScript
import type { MessageTracker } from "./message-tracker";
|
|
import { Cluster } from "./test.util";
|
|
import { setTimeout } from "timers/promises";
|
|
|
|
/**
|
|
* Options for the `publishMessagesUntilAbortSignal` method
|
|
*/
|
|
interface PublishMessagesUntilAbortSignalOptions {
|
|
/**
|
|
* Number of messages to publish in each batch
|
|
*/
|
|
batchSize: number;
|
|
/**
|
|
* Timeout between batches in milliseconds
|
|
*/
|
|
timeoutMs: number;
|
|
/**
|
|
* Function that generates the message content to be published
|
|
*/
|
|
createMessage: () => string;
|
|
}
|
|
|
|
/**
|
|
* Utility class for running test commands until a stop signal is received
|
|
*/
|
|
export class TestCommandRunner {
|
|
private static readonly defaultPublishOptions: PublishMessagesUntilAbortSignalOptions =
|
|
{
|
|
batchSize: 10,
|
|
timeoutMs: 10,
|
|
createMessage: () => Date.now().toString(),
|
|
};
|
|
|
|
/**
|
|
* Continuously publishes messages to the given Redis channels until aborted.
|
|
*
|
|
* @param {Redis|Cluster} client - Redis client or cluster instance used to publish messages.
|
|
* @param {string[]} channels - List of channel names to publish messages to.
|
|
* @param {MessageTracker} messageTracker - Tracks sent and failed message counts per channel.
|
|
* @param {Partial<PublishMessagesUntilAbortSignalOptions>} [options] - Optional overrides for batch size, timeout, and message factory.
|
|
* @param {AbortController} [externalAbortController] - Optional external abort controller to control publishing lifecycle.
|
|
* @returns {{ controller: AbortController, result: Promise<void> }}
|
|
* An object containing the abort controller and a promise that resolves when publishing stops.
|
|
*/
|
|
static publishMessagesUntilAbortSignal(
|
|
client: Cluster,
|
|
channels: string[],
|
|
messageTracker: MessageTracker,
|
|
options?: Partial<PublishMessagesUntilAbortSignalOptions>,
|
|
externalAbortController?: AbortController,
|
|
) {
|
|
const publishOptions = {
|
|
...TestCommandRunner.defaultPublishOptions,
|
|
...options,
|
|
};
|
|
|
|
const abortController = externalAbortController ?? new AbortController();
|
|
|
|
const result = async () => {
|
|
while (!abortController.signal.aborted) {
|
|
const batchPromises: Promise<void>[] = [];
|
|
|
|
for (let i = 0; i < publishOptions.batchSize; i++) {
|
|
for (const channel of channels) {
|
|
const message = publishOptions.createMessage();
|
|
|
|
const publishPromise = client
|
|
.sPublish(channel, message)
|
|
.then(() => {
|
|
messageTracker.incrementSent(channel);
|
|
})
|
|
.catch(() => {
|
|
messageTracker.incrementFailed(channel);
|
|
});
|
|
|
|
batchPromises.push(publishPromise);
|
|
}
|
|
}
|
|
|
|
await Promise.all(batchPromises);
|
|
await setTimeout(publishOptions.timeoutMs);
|
|
}
|
|
};
|
|
|
|
return {
|
|
controller: abortController,
|
|
result: result(),
|
|
};
|
|
}
|
|
}
|