From 0cd69156985a24e1bc1dcaf7245d8cfa7c6dbf9d Mon Sep 17 00:00:00 2001 From: Leibale Date: Wed, 31 Jan 2024 15:07:01 -0500 Subject: [PATCH] fix client.reset --- packages/client/lib/client/commands-queue.ts | 21 ++-- packages/client/lib/client/index.spec.ts | 3 + packages/client/lib/client/index.ts | 103 ++++++++++--------- 3 files changed, 70 insertions(+), 57 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 0c81deb784..034c1e46bd 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,7 +1,7 @@ import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; -import { CommandArguments, TypeMapping, ReplyUnion, RespVersions, SimpleStringReply, ReplyWithTypeMapping } from '../RESP/types'; +import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types'; import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply } from '../errors'; import { MonitorCallback } from '.'; @@ -154,11 +154,11 @@ export default class RedisCommandsQueue { }); } - #addPubSubCommand(command: PubSubCommand, asap = false) { + #addPubSubCommand(command: PubSubCommand, asap = false, chainId?: symbol) { return new Promise((resolve, reject) => { this.#toWrite.add({ args: command.args, - chainId: undefined, + chainId, abort: undefined, resolve() { command.resolve(); @@ -237,13 +237,13 @@ export default class RedisCommandsQueue { return this.#addPubSubCommand(command); } - resubscribe() { + resubscribe(chainId?: symbol) { const commands = this.#pubSub.resubscribe(); if (!commands.length) return; this.#setupPubSubHandler(); return Promise.all( - commands.map(command => this.#addPubSubCommand(command, true)) + commands.map(command => this.#addPubSubCommand(command, true, chainId)) ); } @@ -271,11 +271,12 @@ export default class RedisCommandsQueue { return this.#pubSub.getTypeListeners(type); } - monitor(callback: MonitorCallback, typeMapping: TypeMapping = {}, asap = false) { + monitor(callback: MonitorCallback, options?: CommandOptions) { return new Promise((resolve, reject) => { + const typeMapping = options?.typeMapping ?? {}; this.#toWrite.add({ args: ['MONITOR'], - chainId: undefined, + chainId: options?.chainId, abort: undefined, // using `resolve` instead of using `.then`/`await` to make sure it'll be called before processing the next reply resolve: () => { @@ -295,7 +296,7 @@ export default class RedisCommandsQueue { reject, channelsCounter: undefined, typeMapping - }, asap); + }, options?.asap); }); } @@ -306,7 +307,7 @@ export default class RedisCommandsQueue { #resetFallbackOnReply?: Decoder['onReply']; - async reset(typeMapping?: T) { + async reset(chainId: symbol, typeMapping?: T) { return new Promise((resolve, reject) => { // overriding onReply to handle `RESET` while in `MONITOR` or PubSub mode this.#resetFallbackOnReply = this.decoder.onReply; @@ -328,7 +329,7 @@ export default class RedisCommandsQueue { this.#toWrite.push({ args: ['RESET'], - chainId: undefined, + chainId, abort: undefined, resolve, reject, diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index a893191c4e..2fd689b9d7 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -732,6 +732,9 @@ describe('Client', () => { skipMe: true }) ]); + + await once(duplicate, 'ready'); + await Promise.all([ waitTillBeenCalled(listener), client.ping() diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 29516ee549..d6b9e3714d 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -332,24 +332,8 @@ export default class RedisClient< ); } - #handshake(asap = false, promises: Array> = []) { - if (this.#selectedDB !== 0) { - promises.push( - this.#queue.addCommand( - ['SELECT', this.#selectedDB.toString()], - { asap } - ) - ); - } - - if (this.#options?.readonly) { - promises.push( - this.#queue.addCommand( - COMMANDS.READONLY.transformArguments(), - { asap } - ) - ); - } + #handshake(selectedDB: number) { + const commands = []; if (this.#options?.RESP) { const hello: HelloOptions = {}; @@ -365,43 +349,45 @@ export default class RedisClient< hello.SETNAME = this.#options.name; } - promises.push( - this.#queue.addCommand( - HELLO.transformArguments(this.#options.RESP, hello), - { asap } - ) + commands.push( + HELLO.transformArguments(this.#options.RESP, hello) ); } else { - if (this.#options?.name) { - promises.push( - this.#queue.addCommand( - COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name), - { asap } - ) + if (this.#options?.username || this.#options?.password) { + commands.push( + COMMANDS.AUTH.transformArguments({ + username: this.#options.username, + password: this.#options.password ?? '' + }) ); } - if (this.#options?.username || this.#options?.password) { - promises.push( - this.#queue.addCommand( - COMMANDS.AUTH.transformArguments({ - username: this.#options.username, - password: this.#options.password ?? '' - }), - { asap } - ) + if (this.#options?.name) { + commands.push( + COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name) ); } } - return promises; + if (selectedDB !== 0) { + commands.push(['SELECT', this.#selectedDB.toString()]); + } + + if (this.#options?.readonly) { + commands.push( + COMMANDS.READONLY.transformArguments() + ); + } + + return commands; } #initiateSocket(): RedisSocket { const socketInitiator = () => { - const promises: Array> = []; + const promises = [], + chainId = Symbol('Socket Initiator'); - const resubscribePromise = this.#queue.resubscribe(); + const resubscribePromise = this.#queue.resubscribe(chainId); if (resubscribePromise) { promises.push(resubscribePromise); } @@ -410,13 +396,24 @@ export default class RedisClient< promises.push( this.#queue.monitor( this.#monitorCallback, - this._commandOptions?.typeMapping, - true + { + typeMapping: this._commandOptions?.typeMapping, + chainId, + asap: true + } ) ); } - this.#handshake(true, promises); + const commands = this.#handshake(this.#selectedDB); + for (let i = commands.length - 1; i >= 0; --i) { + promises.push( + this.#queue.addCommand(commands[i], { + chainId, + asap: true + }) + ); + } if (promises.length) { this.#write(); @@ -885,7 +882,9 @@ export default class RedisClient< } async MONITOR(callback: MonitorCallback) { - const promise = this._self.#queue.monitor(callback, this._commandOptions?.typeMapping); + const promise = this._self.#queue.monitor(callback, { + typeMapping: this._commandOptions?.typeMapping + }); this._self.#scheduleWrite(); await promise; this._self.#monitorCallback = callback; @@ -897,10 +896,20 @@ export default class RedisClient< * Reset the client to its default state (i.e. stop PubSub, stop monitoring, select default DB, etc.) */ async reset() { - const promises = [this._self.#queue.reset()]; - this._self.#handshake(false, promises); + const chainId = Symbol('Reset Chain'), + promises = [this._self.#queue.reset(chainId)], + selectedDB = this._self.#options?.database ?? 0; + for (const command of this._self.#handshake(selectedDB)) { + promises.push( + this._self.#queue.addCommand(command, { + chainId + }) + ); + } this._self.#scheduleWrite(); await Promise.all(promises); + this._self.#selectedDB = selectedDB; + this._self.#monitorCallback = undefined; } /**