You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-17 19:41:06 +03:00
fix PubSub UNSUBSCRIBE/PUNSUBSCRIBE without channel and/or listener
This commit is contained in:
@@ -207,9 +207,8 @@ export default class RedisCommandsQueue {
|
||||
unsubscribe(command: PubSubUnsubscribeCommands, channels?: string | Array<string>, listener?: PubSubListener) {
|
||||
const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns;
|
||||
if (!channels) {
|
||||
const keys = [...listeners.keys()];
|
||||
listeners.clear();
|
||||
return this.#pushPubSubCommand(command, keys);
|
||||
return this.#pushPubSubCommand(command);
|
||||
}
|
||||
|
||||
const channelsToUnsubscribe = [];
|
||||
@@ -232,24 +231,36 @@ export default class RedisCommandsQueue {
|
||||
return this.#pushPubSubCommand(command, channelsToUnsubscribe);
|
||||
}
|
||||
|
||||
#pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: Array<string>): Promise<void> {
|
||||
if (!channels.length) return Promise.resolve();
|
||||
#pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels?: Array<string>): Promise<void> {
|
||||
if (channels?.length === 0) return Promise.resolve();
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
|
||||
inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing';
|
||||
inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing',
|
||||
commandArgs: Array<string> = [command];
|
||||
let channelsCounter: number;
|
||||
if (channels) {
|
||||
commandArgs.push(...channels);
|
||||
channelsCounter = channels.length;
|
||||
} else {
|
||||
channelsCounter = (
|
||||
command[0] === 'P' ?
|
||||
this.#pubSubListeners.patterns :
|
||||
this.#pubSubListeners.channels
|
||||
).size;
|
||||
}
|
||||
|
||||
this.#pubSubState[inProgressKey] += channels.length;
|
||||
this.#pubSubState[inProgressKey] += channelsCounter;
|
||||
this.#waitingToBeSent.push({
|
||||
encodedCommand: RedisCommandsQueue.encodeCommand([command, ...channels]),
|
||||
channelsCounter: channels.length,
|
||||
encodedCommand: RedisCommandsQueue.encodeCommand(commandArgs),
|
||||
channelsCounter,
|
||||
resolve: () => {
|
||||
this.#pubSubState[inProgressKey] -= channels.length;
|
||||
this.#pubSubState.subscribed += channels.length * (isSubscribe ? 1 : -1);
|
||||
this.#pubSubState[inProgressKey] -= channelsCounter;
|
||||
this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1);
|
||||
resolve();
|
||||
},
|
||||
reject: () => {
|
||||
this.#pubSubState[inProgressKey] -= channels.length;
|
||||
this.#pubSubState[inProgressKey] -= channelsCounter;
|
||||
reject();
|
||||
}
|
||||
});
|
||||
|
Reference in New Issue
Block a user