1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-09 00:22:08 +03:00

use asap fro pubsub resubscribe

This commit is contained in:
Leibale
2023-12-05 11:44:00 -05:00
parent d8cb5de8b1
commit f804b0949d
2 changed files with 9 additions and 14 deletions

View File

@@ -223,7 +223,7 @@ export default class RedisCommandsQueue {
listener: PubSubListener<T>, listener: PubSubListener<T>,
returnBuffers?: T returnBuffers?: T
) { ) {
return this._pushPubSubCommand( return this._addPubSubCommand(
this._pubSub.subscribe(type, channels, listener, returnBuffers) this._pubSub.subscribe(type, channels, listener, returnBuffers)
); );
} }
@@ -234,7 +234,7 @@ export default class RedisCommandsQueue {
listener?: PubSubListener<T>, listener?: PubSubListener<T>,
returnBuffers?: T returnBuffers?: T
) { ) {
return this._pushPubSubCommand( return this._addPubSubCommand(
this._pubSub.unsubscribe(type, channels, listener, returnBuffers) this._pubSub.unsubscribe(type, channels, listener, returnBuffers)
); );
} }
@@ -244,7 +244,7 @@ export default class RedisCommandsQueue {
if (!commands.length) return; if (!commands.length) return;
return Promise.all( return Promise.all(
commands.map(command => this._pushPubSubCommand(command)) commands.map(command => this._addPubSubCommand(command, true))
); );
} }
@@ -253,13 +253,13 @@ export default class RedisCommandsQueue {
channel: string, channel: string,
listeners: ChannelListeners listeners: ChannelListeners
) { ) {
return this._pushPubSubCommand( return this._addPubSubCommand(
this._pubSub.extendChannelListeners(type, channel, listeners) this._pubSub.extendChannelListeners(type, channel, listeners)
); );
} }
extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) { extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) {
return this._pushPubSubCommand( return this._addPubSubCommand(
this._pubSub.extendTypeListeners(type, listeners) this._pubSub.extendTypeListeners(type, listeners)
); );
} }
@@ -268,11 +268,11 @@ export default class RedisCommandsQueue {
return this._pubSub.getTypeListeners(type); return this._pubSub.getTypeListeners(type);
} }
private _pushPubSubCommand(command: PubSubCommand) { private _addPubSubCommand(command: PubSubCommand, asap = false) {
if (command === undefined) return; if (command === undefined) return;
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
this._toWrite.push({ (asap ? this._toWrite.unshift : this._toWrite.push)({
args: command.args, args: command.args,
chainId: undefined, chainId: undefined,
abort: undefined, abort: undefined,

View File

@@ -335,7 +335,7 @@ export default class RedisClient<
private _initiateSocket(): RedisSocket { private _initiateSocket(): RedisSocket {
const socketInitiator = async (): Promise<void> => { const socketInitiator = async (): Promise<void> => {
const promises = []; const promises = [this._queue.resubscribe()];
if (this._monitorCallback) { if (this._monitorCallback) {
promises.push( promises.push(
@@ -408,11 +408,6 @@ export default class RedisClient<
} }
} }
const resubscribePromise = this._queue.resubscribe();
if (resubscribePromise) {
promises.push(resubscribePromise);
}
if (promises.length) { if (promises.length) {
this._write(); this._write();
await Promise.all(promises); await Promise.all(promises);