From dded3de090ebed5c9a4e0666c38e00b402907bda Mon Sep 17 00:00:00 2001 From: Leibale Date: Wed, 24 Jan 2024 12:28:30 -0500 Subject: [PATCH] fix monitor, add client.reset & client.resetState, some fixes --- packages/client/lib/client/commands-queue.ts | 335 +++++++++++-------- packages/client/lib/client/index.spec.ts | 6 +- packages/client/lib/client/index.ts | 184 ++++++---- packages/client/lib/client/pool.ts | 1 + packages/client/lib/client/pub-sub.ts | 27 +- packages/client/lib/client/socket.ts | 2 +- 6 files changed, 321 insertions(+), 234 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 6261b5dd19..03851b4bee 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,10 +1,9 @@ import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; -import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types'; +import { CommandArguments, TypeMapping, ReplyUnion, RespVersions, SimpleStringReply, ReplyWithTypeMapping } from '../RESP/types'; import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply } from '../errors'; -import { EventEmitter } from 'node:stream'; import { MonitorCallback } from '.'; export interface CommandOptions { @@ -19,24 +18,24 @@ export interface CommandOptions { export interface CommandToWrite extends CommandWaitingForReply { args: CommandArguments; - chainId?: symbol; - abort?: { + chainId: symbol | undefined; + abort: { signal: AbortSignal; listener: () => unknown; - }; - resolveOnWrite?: boolean; + } | undefined; } interface CommandWaitingForReply { resolve(reply?: unknown): void; reject(err: unknown): void; - channelsCounter?: number; - typeMapping?: TypeMapping; + channelsCounter: number | undefined; + typeMapping: TypeMapping | undefined; } export type OnShardedChannelMoved = (channel: string, listeners: ChannelListeners) => void; -const PONG = Buffer.from('pong'); +const PONG = Buffer.from('pong'), + RESET = Buffer.from('RESET'); const RESP2_PUSH_TYPE_MAPPING = { ...PUSH_TYPE_MAPPING, @@ -44,35 +43,28 @@ const RESP2_PUSH_TYPE_MAPPING = { }; export default class RedisCommandsQueue { - readonly #maxLength: number | null | undefined; + readonly #respVersion; + readonly #maxLength; readonly #toWrite = new DoublyLinkedList(); readonly #waitingForReply = new SinglyLinkedList(); - readonly #onShardedChannelMoved: OnShardedChannelMoved; - + readonly #onShardedChannelMoved; + #chainInExecution: symbol | undefined; + readonly decoder; readonly #pubSub = new PubSub(); get isPubSubActive() { return this.#pubSub.isActive; } - #chainInExecution: symbol | undefined; - - decoder: Decoder; - constructor( - respVersion: RespVersions | null | undefined, + respVersion: RespVersions, maxLength: number | null | undefined, - onShardedChannelMoved: EventEmitter['emit'] + onShardedChannelMoved: OnShardedChannelMoved ) { - this.decoder = this.#initiateDecoder(respVersion); + this.#respVersion = respVersion; this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; - } - - #initiateDecoder(respVersion: RespVersions | null | undefined) { - return respVersion === 3 ? - this.#initiateResp3Decoder() : - this.#initiateResp2Decoder(); + this.decoder = this.#initiateDecoder(); } #onReply(reply: ReplyUnion) { @@ -111,7 +103,7 @@ export default class RedisCommandsQueue { return this.#waitingForReply.head!.value.typeMapping ?? {}; } - #initiateResp3Decoder() { + #initiateDecoder() { return new Decoder({ onReply: reply => this.#onReply(reply), onErrorReply: err => this.#onErrorReply(err), @@ -124,61 +116,9 @@ export default class RedisCommandsQueue { }); } - #initiateResp2Decoder() { - return new Decoder({ - onReply: reply => { - if (this.#pubSub.isActive && Array.isArray(reply)) { - if (this.#onPush(reply)) return; - - if (PONG.equals(reply[0] as Buffer)) { - const { resolve, typeMapping } = this.#waitingForReply.shift()!, - buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer; - resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString()); - return; - } - } - - this.#onReply(reply); - }, - onErrorReply: err => this.#onErrorReply(err), - // PUSH type does not exist in RESP2 - // PubSub is handled in onReply - // @ts-expect-error - onPush: undefined, - getTypeMapping: () => { - // PubSub push is an Array in RESP2 - return this.#pubSub.isActive ? - RESP2_PUSH_TYPE_MAPPING : - this.#getTypeMapping(); - } - }); - } - - async monitor(callback: MonitorCallback, typeMapping: TypeMapping = {}, asap = false) { - await this.addCommand( - ['MONITOR'], - { asap }, - true - ); - - const { onReply, getTypeMapping } = this.decoder; - this.decoder.onReply = callback; - this.decoder.getTypeMapping = () => typeMapping; - return () => new Promise(async resolve => { - await this.addCommand(['RESET'], undefined, true); - this.decoder.onReply = (reply: string) => { - if (reply !== 'RESET') return callback(reply); - this.decoder.onReply = onReply; - this.decoder.getTypeMapping = getTypeMapping; - resolve(); - }; - }); - } - addCommand( args: CommandArguments, - options?: CommandOptions, - resolveOnWrite?: boolean + options?: CommandOptions ): Promise { if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) { return Promise.reject(new Error('The queue is full')); @@ -192,7 +132,6 @@ export default class RedisCommandsQueue { args, chainId: options?.chainId, abort: undefined, - resolveOnWrite, resolve, reject, channelsCounter: undefined, @@ -215,66 +154,12 @@ export default class RedisCommandsQueue { }); } - subscribe( - type: PubSubType, - channels: string | Array, - listener: PubSubListener, - returnBuffers?: T - ) { - return this.#addPubSubCommand( - this.#pubSub.subscribe(type, channels, listener, returnBuffers) - ); - } - - unsubscribe( - type: PubSubType, - channels?: string | Array, - listener?: PubSubListener, - returnBuffers?: T - ) { - return this.#addPubSubCommand( - this.#pubSub.unsubscribe(type, channels, listener, returnBuffers) - ); - } - - resubscribe(): Promise | undefined { - const commands = this.#pubSub.resubscribe(); - if (!commands.length) return; - - return Promise.all( - commands.map(command => this.#addPubSubCommand(command, true)) - ); - } - - extendPubSubChannelListeners( - type: PubSubType, - channel: string, - listeners: ChannelListeners - ) { - return this.#addPubSubCommand( - this.#pubSub.extendChannelListeners(type, channel, listeners) - ); - } - - extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) { - return this.#addPubSubCommand( - this.#pubSub.extendTypeListeners(type, listeners) - ); - } - - getPubSubListeners(type: PubSubType) { - return this.#pubSub.getTypeListeners(type); - } - #addPubSubCommand(command: PubSubCommand, asap = false) { - if (command === undefined) return; - return new Promise((resolve, reject) => { this.#toWrite.add({ args: command.args, chainId: undefined, abort: undefined, - resolveOnWrite: false, resolve() { command.resolve(); resolve(); @@ -289,6 +174,171 @@ export default class RedisCommandsQueue { }); } + #setupPubSubHandler(command: Exclude) { + // RESP3 uses `onPush` to handle PubSub, so no need to modify `onReply` + if (this.#respVersion !== 2) return; + + // overriding `resolve` instead of using `.then` to make sure it'll be called before processing the next reply + const { resolve } = command; + command.resolve = () => { + this.decoder.onReply = (reply => { + if (Array.isArray(reply)) { + if (this.#onPush(reply)) return; + + if (PONG.equals(reply[0] as Buffer)) { + const { resolve, typeMapping } = this.#waitingForReply.shift()!, + buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer; + resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString()); + return; + } + } + + return this.#onReply(reply); + }) as Decoder['onReply']; + this.decoder.getTypeMapping = () => RESP2_PUSH_TYPE_MAPPING; + resolve(); + }; + } + + subscribe( + type: PubSubType, + channels: string | Array, + listener: PubSubListener, + returnBuffers?: T + ) { + const command = this.#pubSub.subscribe(type, channels, listener, returnBuffers); + if (!command) return; + + this.#setupPubSubHandler(command); + return this.#addPubSubCommand(command); + } + + #resetDecoderCallbacks() { + this.decoder.onReply = (reply => this.#onReply(reply)) as Decoder['onReply']; + this.decoder.getTypeMapping = () => this.#getTypeMapping(); + } + + unsubscribe( + type: PubSubType, + channels?: string | Array, + listener?: PubSubListener, + returnBuffers?: T + ) { + const command = this.#pubSub.unsubscribe(type, channels, listener, returnBuffers); + if (!command) return; + + if (command && this.#respVersion === 2) { + // RESP2 modifies `onReply` to handle PubSub (see #setupPubSubHandler) + const { resolve } = command; + command.resolve = () => { + if (!this.#pubSub.isActive) { + this.#resetDecoderCallbacks(); + } + + resolve(); + }; + } + + return this.#addPubSubCommand(command); + } + + resubscribe() { + const commands = this.#pubSub.resubscribe(); + if (!commands.length) return; + + // using last command becasue of asap + this.#setupPubSubHandler(commands[commands.length - 1]); + return Promise.all( + commands.map(command => this.#addPubSubCommand(command, true)) + ); + } + + extendPubSubChannelListeners( + type: PubSubType, + channel: string, + listeners: ChannelListeners + ) { + const command = this.#pubSub.extendChannelListeners(type, channel, listeners); + if (!command) return; + + this.#setupPubSubHandler(command); + return this.#addPubSubCommand(command); + } + + extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) { + const command = this.#pubSub.extendTypeListeners(type, listeners); + if (!command) return; + + this.#setupPubSubHandler(command); + return this.#addPubSubCommand(command); + } + + getPubSubListeners(type: PubSubType) { + return this.#pubSub.getTypeListeners(type); + } + + monitor(callback: MonitorCallback, typeMapping: TypeMapping = {}, asap = false) { + return new Promise((resolve, reject) => { + this.#toWrite.add({ + args: ['MONITOR'], + chainId: undefined, + abort: undefined, + // using `resolve` instead of using `.then`/`await` to make sure it'll be called before processing the next reply + resolve: () => { + // after running `MONITOR` only `MONITOR` and `RESET` replies are expected + // any other command should cause an error + + // if `RESET` already overrides `onReply`, set monitor as it's fallback + if (this.#resetFallbackOnReply) { + this.#resetFallbackOnReply = callback; + } else { + this.decoder.onReply = callback; + } + + this.decoder.getTypeMapping = () => typeMapping; + resolve(); + }, + reject, + channelsCounter: undefined, + typeMapping + }, asap); + }); + } + + #resetFallbackOnReply?: Decoder['onReply']; + + async reset(typeMapping?: T) { + return new Promise((resolve, reject) => { + // overriding onReply to handle `RESET` while in `MONITOR` or PubSub mode + this.#resetFallbackOnReply = this.decoder.onReply; + this.decoder.onReply = (reply => { + if ( + (typeof reply === 'string' && reply === 'RESET') || + (reply instanceof Buffer && RESET.equals(reply)) + ) { + this.#resetDecoderCallbacks(); + this.#resetFallbackOnReply = undefined; + this.#pubSub.reset(); + + this.#waitingForReply.shift()!.resolve(reply); + return; + } + + this.#resetFallbackOnReply!(reply); + }) as Decoder['onReply']; + + this.#toWrite.push({ + args: ['RESET'], + chainId: undefined, + abort: undefined, + resolve, + reject, + channelsCounter: undefined, + typeMapping + }); + }); + } + isWaitingToWrite() { return this.#toWrite.length > 0; } @@ -305,22 +355,15 @@ export default class RedisCommandsQueue { continue; } + // TODO reuse `toSend` or create new object? + (toSend as any).args = undefined; if (toSend.abort) { RedisCommandsQueue.#removeAbortListener(toSend); toSend.abort = undefined; } - - if (toSend.resolveOnWrite) { - toSend.resolve(); - } else { - // TODO reuse `toSend` or create new object? - (toSend as any).args = undefined; - - this.#chainInExecution = toSend.chainId; - toSend.chainId = undefined; - - this.#waitingForReply.push(toSend); - } + this.#chainInExecution = toSend.chainId; + toSend.chainId = undefined; + this.#waitingForReply.push(toSend); yield encoded; toSend = this.#toWrite.shift(); diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index d8ac7487cc..a893191c4e 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -742,8 +742,10 @@ describe('Client', () => { }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('should be able to go back to "normal mode"', async client => { - const off = await client.monitor(() => {}); - await off(); + await Promise.all([ + client.monitor(() => {}), + client.reset() + ]); await assert.doesNotReject(client.ping()); }, GLOBAL.SERVERS.OPEN); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 7d9cadde2e..5ff842049b 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -326,15 +326,85 @@ export default class RedisClient< #initiateQueue(): RedisCommandsQueue { return new RedisCommandsQueue( - this.#options?.RESP, + this.#options?.RESP ?? 2, this.#options?.commandsQueueMaxLength, (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners) ); } + #handshake(asap = false, promises: Array> = []) { + if (this.#selectedDB !== 0) { + promises.push( + this.#queue.addCommand( + ['SELECT', this.#selectedDB.toString()], + { asap } + ) + ); + } + + if (this.#options?.readonly) { + promises.push( + this.#queue.addCommand( + COMMANDS.READONLY.transformArguments(), + { asap } + ) + ); + } + + if (this.#options?.RESP) { + const hello: HelloOptions = {}; + + if (this.#options.password) { + hello.AUTH = { + username: this.#options.username ?? 'default', + password: this.#options.password + }; + } + + if (this.#options.name) { + hello.SETNAME = this.#options.name; + } + + promises.push( + this.#queue.addCommand( + HELLO.transformArguments(this.#options.RESP, hello), + { asap } + ) + ); + } else { + if (this.#options?.name) { + promises.push( + this.#queue.addCommand( + COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name), + { asap } + ) + ); + } + + if (this.#options?.username || this.#options?.password) { + promises.push( + this.#queue.addCommand( + COMMANDS.AUTH.transformArguments({ + username: this.#options.username, + password: this.#options.password ?? '' + }), + { asap } + ) + ); + } + } + + return promises; + } + #initiateSocket(): RedisSocket { - const socketInitiator = async (): Promise => { - const promises = [this.#queue.resubscribe()]; + const socketInitiator = () => { + const promises: Array> = []; + + const resubscribePromise = this.#queue.resubscribe(); + if (resubscribePromise) { + promises.push(resubscribePromise); + } if (this.#monitorCallback) { promises.push( @@ -346,70 +416,11 @@ export default class RedisClient< ); } - if (this.#selectedDB !== 0) { - promises.push( - this.#queue.addCommand( - ['SELECT', this.#selectedDB.toString()], - { asap: true } - ) - ); - } - - if (this.#options?.readonly) { - promises.push( - this.#queue.addCommand( - COMMANDS.READONLY.transformArguments(), - { asap: true } - ) - ); - } - - if (this.#options?.RESP) { - const hello: HelloOptions = {}; - - if (this.#options.password) { - hello.AUTH = { - username: this.#options.username ?? 'default', - password: this.#options.password - }; - } - - if (this.#options.name) { - hello.SETNAME = this.#options.name; - } - - promises.push( - this.#queue.addCommand( - HELLO.transformArguments(this.#options.RESP, hello), - { asap: true } - ) - ); - } else { - if (this.#options?.name) { - promises.push( - this.#queue.addCommand( - COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name), - { asap: true } - ) - ); - } - - if (this.#options?.username || this.#options?.password) { - promises.push( - this.#queue.addCommand( - COMMANDS.AUTH.transformArguments({ - username: this.#options.username, - password: this.#options.password ?? '' - }), - { asap: true } - ) - ); - } - } + this.#handshake(true, promises); if (promises.length) { this.#write(); - await Promise.all(promises); + return Promise.all(promises); } }; @@ -876,19 +887,48 @@ export default class RedisClient< async MONITOR(callback: MonitorCallback) { const promise = this._self.#queue.monitor(callback, this._commandOptions?.typeMapping); this._self.#scheduleWrite(); - - const off = await promise; + await promise; this._self.#monitorCallback = callback; - return async () => { - const promise = off(); - this._self.#scheduleWrite(); - await promise; - this._self.#monitorCallback = undefined; - }; } monitor = this.MONITOR; + /** + * Reset the client to its default state (i.e. stop PubSub, stop monitoring, select default DB, etc.) + */ + async reset() { + const promises = [this._self.#queue.reset()]; + this._self.#handshake(false, promises); + await Promise.all(promises); + } + + /** + * If the client has state, reset it. + * An internal function to be used by wrapper class such as `RedisClientPool`. + * @internal + */ + resetIfDirty() { + let shouldReset = false; + if (this._self.#selectedDB !== this._self.#options?.database ?? 0) { + console.warn('Returning a client with a different selected DB'); + shouldReset = true; + } + + if (this._self.#monitorCallback) { + console.warn('Returning a client with active MONITOR'); + shouldReset = true; + } + + if (this._self.#queue.isPubSubActive) { + console.warn('Returning a client with active PubSub'); + shouldReset = true; + } + + if (shouldReset) { + return this.reset(); + } + } + /** * @deprecated use .close instead */ diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index 12d8f5fee5..fc996e0762 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -375,6 +375,7 @@ export class RedisClientPool< #returnClient(node: DoublyLinkedNode>) { const task = this.#tasksQueue.shift(); if (task) { + clearTimeout(task.timeout); this.#executeTask(node, task.resolve, task.reject, task.fn); return; } diff --git a/packages/client/lib/client/pub-sub.ts b/packages/client/lib/client/pub-sub.ts index 7c29099d71..aedbaf68d6 100644 --- a/packages/client/lib/client/pub-sub.ts +++ b/packages/client/lib/client/pub-sub.ts @@ -1,4 +1,5 @@ import { RedisArgument } from '../RESP/types'; +import { CommandToWrite } from './commands-queue'; export enum PubSubType { CHANNELS = 'CHANNELS', @@ -26,7 +27,7 @@ const COMMANDS = { export type PubSubListener< RETURN_BUFFERS extends boolean = false -> = (message: T, channel: T) => unknown; +> = (message: T, channel: T) => unknown; export interface ChannelListeners { unsubscribing: boolean; @@ -38,11 +39,11 @@ export type PubSubTypeListeners = Map; type Listeners = Record; -export type PubSubCommand = ReturnType< - typeof PubSub.prototype.subscribe | - typeof PubSub.prototype.unsubscribe | - typeof PubSub.prototype.extendTypeListeners ->; +export type PubSubCommand = ( + Required> & { + reject: undefined | (() => unknown); + } +); export class PubSub { static isStatusReply(reply: Array): boolean { @@ -135,7 +136,7 @@ export class PubSub { this.#subscribing--; this.#updateIsActive(); } - }; + } satisfies PubSubCommand; } extendChannelListeners( @@ -158,7 +159,7 @@ export class PubSub { this.#subscribing--; this.#updateIsActive(); } - }; + } satisfies PubSubCommand; } #extendChannelListeners( @@ -203,7 +204,7 @@ export class PubSub { this.#subscribing--; this.#updateIsActive(); } - }; + } satisfies PubSubCommand; } unsubscribe( @@ -299,8 +300,8 @@ export class PubSub { removeListeners(); this.#updateIsActive(); }, - reject: undefined // use the same structure as `subscribe` - }; + reject: undefined + } satisfies PubSubCommand; } #updateIsActive() { @@ -317,7 +318,7 @@ export class PubSub { this.#subscribing = 0; } - resubscribe(): Array { + resubscribe() { const commands = []; for (const [type, listeners] of Object.entries(this.#listeners)) { if (!listeners.size) continue; @@ -333,7 +334,7 @@ export class PubSub { channelsCounter: listeners.size, resolve: callback, reject: callback - }); + } satisfies PubSubCommand); } return commands; diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 129ace9039..753ced6c5e 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -43,7 +43,7 @@ interface CreateSocketReturn { socket: T; } -export type RedisSocketInitiator = () => Promise; +export type RedisSocketInitiator = () => void | Promise; export default class RedisSocket extends EventEmitter { static #initiateOptions(options?: RedisSocketOptions): RedisSocketOptions {