diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index dcfbd335fe..8806a50a9d 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -223,7 +223,7 @@ export default class RedisCommandsQueue { listener: PubSubListener, returnBuffers?: T ) { - return this._pushPubSubCommand( + return this._addPubSubCommand( this._pubSub.subscribe(type, channels, listener, returnBuffers) ); } @@ -234,7 +234,7 @@ export default class RedisCommandsQueue { listener?: PubSubListener, returnBuffers?: T ) { - return this._pushPubSubCommand( + return this._addPubSubCommand( this._pubSub.unsubscribe(type, channels, listener, returnBuffers) ); } @@ -244,7 +244,7 @@ export default class RedisCommandsQueue { if (!commands.length) return; return Promise.all( - commands.map(command => this._pushPubSubCommand(command)) + commands.map(command => this._addPubSubCommand(command, true)) ); } @@ -253,13 +253,13 @@ export default class RedisCommandsQueue { channel: string, listeners: ChannelListeners ) { - return this._pushPubSubCommand( + return this._addPubSubCommand( this._pubSub.extendChannelListeners(type, channel, listeners) ); } extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) { - return this._pushPubSubCommand( + return this._addPubSubCommand( this._pubSub.extendTypeListeners(type, listeners) ); } @@ -268,11 +268,11 @@ export default class RedisCommandsQueue { return this._pubSub.getTypeListeners(type); } - private _pushPubSubCommand(command: PubSubCommand) { + private _addPubSubCommand(command: PubSubCommand, asap = false) { if (command === undefined) return; return new Promise((resolve, reject) => { - this._toWrite.push({ + (asap ? this._toWrite.unshift : this._toWrite.push)({ args: command.args, chainId: undefined, abort: undefined, @@ -287,7 +287,7 @@ export default class RedisCommandsQueue { }, channelsCounter: command.channelsCounter, typeMapping: PUSH_TYPE_MAPPING - }); + }); }); } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 69b5ec6054..e47a500420 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -335,7 +335,7 @@ export default class RedisClient< private _initiateSocket(): RedisSocket { const socketInitiator = async (): Promise => { - const promises = []; + const promises = [this._queue.resubscribe()]; if (this._monitorCallback) { promises.push( @@ -408,11 +408,6 @@ export default class RedisClient< } } - const resubscribePromise = this._queue.resubscribe(); - if (resubscribePromise) { - promises.push(resubscribePromise); - } - if (promises.length) { this._write(); await Promise.all(promises);