diff --git a/lib/client.ts b/lib/client.ts index 69d2b9c23e..fb857bf4ff 100644 --- a/lib/client.ts +++ b/lib/client.ts @@ -444,20 +444,29 @@ export default class RedisClient this.#tick()); + this.#isTickQueued = true; + return; + } const isBuffering = this.#queue.executeChunk(chunkRecommendedSize); if (isBuffering === true) { this.#socket.once('drain', () => this.#tick()); } else if (isBuffering === false) { this.#tick(); + return; } + + this.#isTickQueued = false; } } diff --git a/lib/commands-queue.ts b/lib/commands-queue.ts index 4cfd4e6739..1e3919e7ae 100644 --- a/lib/commands-queue.ts +++ b/lib/commands-queue.ts @@ -74,6 +74,12 @@ export default class RedisCommandsQueue { readonly #waitingToBeSent = new LinkedList(); + #waitingToBeSentCommandsLength = 0; + + get waitingToBeSentCommandsLength() { + return this.#waitingToBeSentCommandsLength; + } + readonly #waitingForReply = new LinkedList(); readonly #pubSubState = { @@ -97,7 +103,7 @@ export default class RedisCommandsQueue { reply[2], reply[1] ); - + case 'pmessage': return RedisCommandsQueue.#emitPubSubMessage( this.#pubSubListeners.patterns.get(reply[1])!, @@ -108,7 +114,7 @@ export default class RedisCommandsQueue { case 'subscribe': case 'psubscribe': if (--this.#waitingForReply.head!.value.channelsCounter! === 0) { - this.#shiftWaitingForReply().resolve(); + this.#shiftWaitingForReply().resolve(); } return; } @@ -185,6 +191,8 @@ export default class RedisCommandsQueue { } else { this.#waitingToBeSent.pushNode(node); } + + this.#waitingToBeSentCommandsLength += encodedCommand.length; }); } @@ -325,6 +333,7 @@ export default class RedisCommandsQueue { } this.#chainInExecution = lastCommandChainId; + this.#waitingToBeSentCommandsLength -= size; } parseResponse(data: Buffer): void { diff --git a/lib/socket.ts b/lib/socket.ts index 804bc766d6..18d56abcd5 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -159,6 +159,7 @@ export default class RedisSocket extends EventEmitter { this.#createNetSocket(); socket + .setNoDelay() .once('error', (err) => reject(err)) .once(connectEvent, () => { socket