diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 58699174cd..8caa2f178a 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -16,7 +16,7 @@ export interface CommandOptions { typeMapping?: T; } -export interface CommandWaitingToBeSent extends CommandWaitingForReply { +export interface CommandToWrite extends CommandWaitingForReply { args: CommandArguments; chainId?: symbol; abort?: { @@ -43,7 +43,7 @@ const RESP2_PUSH_TYPE_MAPPING = { export default class RedisCommandsQueue { private readonly _maxLength: number | null | undefined; - private readonly _waitingToBeSent = new DoublyLinkedList(); + private readonly _toWrite = new DoublyLinkedList(); private readonly _waitingForReply = new SinglyLinkedList(); private readonly _onShardedChannelMoved: OnShardedChannelMoved; @@ -153,15 +153,15 @@ export default class RedisCommandsQueue { } addCommand(args: CommandArguments, options?: CommandOptions): Promise { - if (this._maxLength && this._waitingToBeSent.length + this._waitingForReply.length >= this._maxLength) { + if (this._maxLength && this._toWrite.length + this._waitingForReply.length >= this._maxLength) { return Promise.reject(new Error('The queue is full')); } else if (options?.abortSignal?.aborted) { return Promise.reject(new AbortError()); } return new Promise((resolve, reject) => { - let node: DoublyLinkedNode; - const value: CommandWaitingToBeSent = { + let node: DoublyLinkedNode; + const value: CommandToWrite = { args, chainId: options?.chainId, typeMapping: options?.typeMapping, @@ -175,7 +175,7 @@ export default class RedisCommandsQueue { value.abort = { signal, listener: () => { - this._waitingToBeSent.remove(node); + this._toWrite.remove(node); value.reject(new AbortError()); } }; @@ -183,8 +183,8 @@ export default class RedisCommandsQueue { } node = options?.asap ? - this._waitingToBeSent.unshift(value) : - this._waitingToBeSent.push(value); + this._toWrite.unshift(value) : + this._toWrite.push(value); }); } @@ -243,7 +243,7 @@ export default class RedisCommandsQueue { if (command === undefined) return; return new Promise((resolve, reject) => { - this._waitingToBeSent.push({ + this._toWrite.push({ args: command.args, channelsCounter: command.channelsCounter, typeMapping: PUSH_TYPE_MAPPING, @@ -259,15 +259,19 @@ export default class RedisCommandsQueue { }); } - *waitingToBeSent() { - let toSend = this._waitingToBeSent.shift(); + isWaitingToWrite() { + return this._toWrite.length > 0; + } + + *commandsToWrite() { + let toSend = this._toWrite.shift(); while (toSend) { let encoded: CommandArguments; try { encoded = encodeCommand(toSend.args); } catch (err) { toSend.reject(err); - toSend = this._waitingToBeSent.shift(); + toSend = this._toWrite.shift(); continue; } @@ -283,7 +287,7 @@ export default class RedisCommandsQueue { this._waitingForReply.push(toSend); this._chainInExecution = toSend.chainId; yield encoded; - toSend = this._waitingToBeSent.shift(); + toSend = this._toWrite.shift(); } } @@ -294,11 +298,11 @@ export default class RedisCommandsQueue { this._waitingForReply.reset(); } - private static _removeAbortListener(command: CommandWaitingToBeSent) { + private static _removeAbortListener(command: CommandToWrite) { command.abort!.signal.removeEventListener('abort', command.abort!.listener); } - private static _flushWaitingToBeSent(toBeSent: CommandWaitingToBeSent, err: Error) { + private static _flushToWrite(toBeSent: CommandToWrite, err: Error) { if (toBeSent.abort) { RedisCommandsQueue._removeAbortListener(toBeSent); } @@ -314,9 +318,9 @@ export default class RedisCommandsQueue { if (!this._chainInExecution) return; - while (this._waitingToBeSent.head?.value.chainId === this._chainInExecution) { - RedisCommandsQueue._flushWaitingToBeSent( - this._waitingToBeSent.shift()!, + while (this._toWrite.head?.value.chainId === this._chainInExecution) { + RedisCommandsQueue._flushToWrite( + this._toWrite.shift()!, err ); } @@ -328,15 +332,15 @@ export default class RedisCommandsQueue { this.decoder.reset(); this._pubSub.reset(); this._flushWaitingForReply(err); - for (const node of this._waitingToBeSent) { - RedisCommandsQueue._flushWaitingToBeSent(node, err); + for (const node of this._toWrite) { + RedisCommandsQueue._flushToWrite(node, err); } - this._waitingToBeSent.reset(); + this._toWrite.reset(); } isEmpty() { return ( - this._waitingToBeSent.length === 0 && + this._toWrite.length === 0 && this._waitingForReply.length === 0 ); } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 3d43ae55b8..d53f8bec60 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -440,10 +440,10 @@ export default class RedisClient< .on('ready', () => { this.emit('ready'); this._setPingTimer(); - this._scheduleWrite(); + this._maybeScheduleWrite(); }) .on('reconnecting', () => this.emit('reconnecting')) - .on('drain', () => this._scheduleWrite()) + .on('drain', () => this._maybeScheduleWrite()) .on('end', () => this.emit('end')); } @@ -706,7 +706,7 @@ export default class RedisClient< } private _write() { - this._socket.write(this._queue.waitingToBeSent()); + this._socket.write(this._queue.commandsToWrite()); } private _scheduledWrite?: NodeJS.Immediate; @@ -720,6 +720,12 @@ export default class RedisClient< }); } + private _maybeScheduleWrite() { + if (!this._queue.isWaitingToWrite()) return; + + this._scheduleWrite(); + } + /** * @internal */ diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index a841828c3a..86c9923393 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -229,9 +229,7 @@ export default class RedisSocket extends EventEmitter { } write(iterator: IterableIterator>): void { - if (!this._socket) { - throw new ClientClosedError(); - } + if (!this._socket) return; this._socket.cork(); for (const args of iterator) {