You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-17 19:41:06 +03:00
fix PubSub
This commit is contained in:
@@ -201,6 +201,10 @@ export default class RedisCommandsQueue {
|
||||
channelsToSubscribe.push(channel);
|
||||
}
|
||||
|
||||
if (!channelsToSubscribe.length) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
return this.#pushPubSubCommand(command, channelsToSubscribe);
|
||||
}
|
||||
|
||||
@@ -228,28 +232,31 @@ export default class RedisCommandsQueue {
|
||||
}
|
||||
}
|
||||
|
||||
if (!channelsToUnsubscribe.length) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
return this.#pushPubSubCommand(command, channelsToUnsubscribe);
|
||||
}
|
||||
|
||||
#pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels?: Array<string>): Promise<void> {
|
||||
if (channels?.length === 0) return Promise.resolve();
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
|
||||
inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing',
|
||||
commandArgs: Array<string> = [command];
|
||||
let channelsCounter: number;
|
||||
if (channels) {
|
||||
if (channels?.length) {
|
||||
commandArgs.push(...channels);
|
||||
channelsCounter = channels.length;
|
||||
} else {
|
||||
// unsubscribe only
|
||||
channelsCounter = (
|
||||
command[0] === 'P' ?
|
||||
this.#pubSubListeners.patterns :
|
||||
this.#pubSubListeners.channels
|
||||
).size;
|
||||
}
|
||||
|
||||
|
||||
this.#pubSubState[inProgressKey] += channelsCounter;
|
||||
this.#waitingToBeSent.push({
|
||||
encodedCommand: RedisCommandsQueue.encodeCommand(commandArgs),
|
||||
|
Reference in New Issue
Block a user