diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 11ee791a79..0c81deb784 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -299,6 +299,11 @@ export default class RedisCommandsQueue { }); } + resetDecoder() { + this.#resetDecoderCallbacks(); + this.decoder.reset(); + } + #resetFallbackOnReply?: Decoder['onReply']; async reset(typeMapping?: T) { @@ -384,7 +389,7 @@ export default class RedisCommandsQueue { } flushWaitingForReply(err: Error): void { - this.decoder.reset(); + this.resetDecoder(); this.#pubSub.reset(); this.#flushWaitingForReply(err); @@ -402,7 +407,7 @@ export default class RedisCommandsQueue { } flushAll(err: Error): void { - this.decoder.reset(); + this.resetDecoder(); this.#pubSub.reset(); this.#flushWaitingForReply(err); for (const node of this.#toWrite) { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 8ec2477cd1..29516ee549 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -429,7 +429,7 @@ export default class RedisClient< try { this.#queue.decoder.write(chunk); } catch (err) { - this.#queue.decoder.reset(); + this.#queue.resetDecoder(); this.emit('error', err); } }) @@ -899,6 +899,7 @@ export default class RedisClient< async reset() { const promises = [this._self.#queue.reset()]; this._self.#handshake(false, promises); + this._self.#scheduleWrite(); await Promise.all(promises); }