diff --git a/lib/commands-queue.ts b/lib/commands-queue.ts index fc3eab67ed..f2dd0d270a 100644 --- a/lib/commands-queue.ts +++ b/lib/commands-queue.ts @@ -207,9 +207,8 @@ export default class RedisCommandsQueue { unsubscribe(command: PubSubUnsubscribeCommands, channels?: string | Array, listener?: PubSubListener) { const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns; if (!channels) { - const keys = [...listeners.keys()]; listeners.clear(); - return this.#pushPubSubCommand(command, keys); + return this.#pushPubSubCommand(command); } const channelsToUnsubscribe = []; @@ -232,24 +231,36 @@ export default class RedisCommandsQueue { return this.#pushPubSubCommand(command, channelsToUnsubscribe); } - #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: Array): Promise { - if (!channels.length) return Promise.resolve(); + #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels?: Array): Promise { + if (channels?.length === 0) return Promise.resolve(); return new Promise((resolve, reject) => { const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE, - inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing'; + inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing', + commandArgs: Array = [command]; + let channelsCounter: number; + if (channels) { + commandArgs.push(...channels); + channelsCounter = channels.length; + } else { + channelsCounter = ( + command[0] === 'P' ? + this.#pubSubListeners.patterns : + this.#pubSubListeners.channels + ).size; + } - this.#pubSubState[inProgressKey] += channels.length; + this.#pubSubState[inProgressKey] += channelsCounter; this.#waitingToBeSent.push({ - encodedCommand: RedisCommandsQueue.encodeCommand([command, ...channels]), - channelsCounter: channels.length, + encodedCommand: RedisCommandsQueue.encodeCommand(commandArgs), + channelsCounter, resolve: () => { - this.#pubSubState[inProgressKey] -= channels.length; - this.#pubSubState.subscribed += channels.length * (isSubscribe ? 1 : -1); + this.#pubSubState[inProgressKey] -= channelsCounter; + this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1); resolve(); }, reject: () => { - this.#pubSubState[inProgressKey] -= channels.length; + this.#pubSubState[inProgressKey] -= channelsCounter; reject(); } });