diff --git a/lib/client/index.spec.ts b/lib/client/index.spec.ts index e98814d058..4d30e9be60 100644 --- a/lib/client/index.spec.ts +++ b/lib/client/index.spec.ts @@ -119,6 +119,18 @@ describe('Client', () => { assert.equal(client.isOpen, false); }); + + itWithClient(TestRedisServers.PASSWORD, 'should execute AUTH before SELECT', async client => { + assert.equal( + (await client.clientInfo()).db, + 2 + ); + }, { + minimumRedisVersion: [6, 2], + clientOptions: { + database: 2 + } + }); }); describe('legacyMode', () => { diff --git a/lib/client/index.ts b/lib/client/index.ts index 7c094e154f..8850574e71 100644 --- a/lib/client/index.ts +++ b/lib/client/index.ts @@ -177,24 +177,44 @@ export default class RedisClient #initiateSocket(): RedisSocket { const socketInitiator = async (): Promise => { - const v4Commands = this.#options?.legacyMode ? this.#v4 : this, - promises = []; + const promises = []; if (this.#selectedDB !== 0) { - promises.push(v4Commands.select(RedisClient.commandOptions({ asap: true }), this.#selectedDB)); + promises.push( + this.#queue.addCommand( + ['SELECT', this.#selectedDB.toString()], + { asap: true } + ) + ); } if (this.#options?.readonly) { - promises.push(v4Commands.readonly(RedisClient.commandOptions({ asap: true }))); + promises.push( + this.#queue.addCommand( + COMMANDS.READONLY.transformArguments(), + { asap: true } + ) + ); } if (this.#options?.username || this.#options?.password) { - promises.push(v4Commands.auth(RedisClient.commandOptions({ asap: true }), this.#options)); + promises.push( + this.#queue.addCommand( + COMMANDS.AUTH.transformArguments({ + username: this.#options.username, + password: this.#options.password ?? '' + }), + { asap: true } + ) + ); } const resubscribePromise = this.#queue.resubscribe(); if (resubscribePromise) { promises.push(resubscribePromise); + } + + if (promises.length) { this.#tick(); } @@ -410,7 +430,7 @@ export default class RedisClient quit = this.QUIT; #tick(): void { - if (!this.#socket.isSocketExists) { + if (!this.#socket.isSocketExists || this.#socket.writableNeedDrain) { return; } diff --git a/lib/client/socket.ts b/lib/client/socket.ts index ca48ad4d54..88ae03003a 100644 --- a/lib/client/socket.ts +++ b/lib/client/socket.ts @@ -76,6 +76,14 @@ export default class RedisSocket extends EventEmitter { return !!this.#socket; } + // `writable.writableNeedDrain` was added in v15.2.0 and therefore can't be used + // https://nodejs.org/api/stream.html#stream_writable_writableneeddrain + #writableNeedDrain = false; + + get writableNeedDrain(): boolean { + return this.#writableNeedDrain; + } + constructor(initiator?: RedisSocketInitiator, options?: RedisSocketOptions) { super(); @@ -163,7 +171,10 @@ export default class RedisSocket extends EventEmitter { this.#onSocketError(new Error('Socket closed unexpectedly')); } }) - .on('drain', () => this.emit('drain')) + .on('drain', () => { + this.#writableNeedDrain = false; + this.emit('drain'); + }) .on('data', (data: Buffer) => this.emit('data', data)); resolve(socket); @@ -198,7 +209,9 @@ export default class RedisSocket extends EventEmitter { throw new ClientClosedError(); } - return this.#socket.write(toWrite); + const wasFullyWritten = this.#socket.write(toWrite); + this.#writableNeedDrain = !wasFullyWritten; + return wasFullyWritten; } async disconnect(ignoreIsOpen = false): Promise { diff --git a/lib/test-utils.ts b/lib/test-utils.ts index 978940ff93..3b823ac6ee 100644 --- a/lib/test-utils.ts +++ b/lib/test-utils.ts @@ -284,16 +284,23 @@ export function describeHandleMinimumRedisVersion(minimumVersion: PartialRedisVe }); } +interface RedisClientTestOptions extends RedisTestOptions { + clientOptions?: RedisClientOptions<{}, {}>; +} + export function itWithClient( type: TestRedisServers, title: string, fn: (client: RedisClientType) => Promise, - options?: RedisTestOptions + options?: RedisClientTestOptions ): void { it(title, async function () { if (handleMinimumRedisVersion(this, options?.minimumRedisVersion)) return; - const client = RedisClient.create(TEST_REDIS_SERVERS[type]); + const client = RedisClient.create({ + ...TEST_REDIS_SERVERS[type], + ...options?.clientOptions + }); await client.connect();