From a6c0d1dbb30f4db84c47a1260baa9db0b615f512 Mon Sep 17 00:00:00 2001 From: Leibale Date: Tue, 9 May 2023 12:47:19 +0300 Subject: [PATCH] legacy mode and isolation pool --- packages/client/lib/client/index.ts | 251 +++++------------- packages/client/lib/client/legacy-mode.ts | 107 ++++++++ packages/client/lib/client/pool.ts | 84 ++++++ packages/client/lib/commands/READONLY.spec.ts | 12 +- packages/client/lib/commands/SCAN.ts | 2 +- packages/client/lib/commands/index.ts | 3 + 6 files changed, 272 insertions(+), 187 deletions(-) create mode 100644 packages/client/lib/client/legacy-mode.ts create mode 100644 packages/client/lib/client/pool.ts diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 493d32de9d..df736f3f60 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -11,9 +11,10 @@ import { Command, CommandArguments, CommandSignature, Flags, CommanderConfig, Re import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; import { RedisMultiQueuedCommand } from '../multi-command'; import HELLO, { HelloOptions } from '../commands/HELLO'; -import { Pool, Options as PoolOptions, createPool } from 'generic-pool'; import { ReplyWithFlags, CommandReply } from '../RESP/types'; import SCAN, { ScanOptions, ScanCommonOptions } from '../commands/SCAN'; +import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; +import { RedisClientPool } from './pool'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -59,14 +60,6 @@ export interface RedisClientOptions< * Connect in [`READONLY`](https://redis.io/commands/readonly) mode */ readonly?: boolean; - /** - * TODO - */ - legacyMode?: boolean; - /** - * TODO - */ - isolationPoolOptions?: PoolOptions; /** * Send `PING` command at interval (in ms). * Useful with Redis deployments that do not use TCP Keep-Alive. @@ -78,36 +71,36 @@ type WithCommands< RESP extends RespVersions, FLAGS extends Flags > = { - [P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, FLAGS>; -}; + [P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, FLAGS>; + }; type WithModules< M extends RedisModules, RESP extends RespVersions, FLAGS extends Flags > = { - [P in keyof M]: { - [C in keyof M[P]]: CommandSignature; + [P in keyof M]: { + [C in keyof M[P]]: CommandSignature; + }; }; -}; type WithFunctions< F extends RedisFunctions, RESP extends RespVersions, FLAGS extends Flags > = { - [L in keyof F]: { - [C in keyof F[L]]: CommandSignature; + [L in keyof F]: { + [C in keyof F[L]]: CommandSignature; + }; }; -}; type WithScripts< S extends RedisScripts, RESP extends RespVersions, FLAGS extends Flags > = { - [P in keyof S]: CommandSignature; -}; + [P in keyof S]: CommandSignature; + }; export type RedisClientType< M extends RedisModules = {}, @@ -116,19 +109,17 @@ export type RedisClientType< RESP extends RespVersions = 2, FLAGS extends Flags = {} > = ( - RedisClient & - WithCommands & - WithModules & - WithFunctions & - WithScripts -); + RedisClient & + WithCommands & + WithModules & + WithFunctions & + WithScripts + ); export interface ClientCommandOptions extends QueueCommandOptions { - isolated?: boolean; + // isolated?: boolean; } -// type ClientLegacyCallback = (err: Error | null, reply?: RedisCommandRawReply) => void; - type ProxyClient = RedisClient<{}, {}, {}, RespVersions, Flags> & { commandOptions?: ClientCommandOptions }; type NamespaceProxyClient = { self: ProxyClient }; @@ -148,7 +139,7 @@ export default class RedisClient< const transformReply = getTransformReply(command, resp); return async function (this: ProxyClient) { const args = command.transformArguments.apply(undefined, arguments as any), - reply = await this._sendCommand(args, this.commandOptions); + reply = await this.sendCommand(args, this.commandOptions); return transformReply ? transformReply(reply, args.preserve) : reply; @@ -159,7 +150,7 @@ export default class RedisClient< const transformReply = getTransformReply(command, resp); return async function (this: NamespaceProxyClient) { const args = command.transformArguments.apply(undefined, arguments as any), - reply = await this.self._sendCommand(args, this.self.commandOptions); + reply = await this.self.sendCommand(args, this.self.commandOptions); return transformReply ? transformReply(reply, args.preserve) : reply; @@ -171,7 +162,7 @@ export default class RedisClient< transformReply = getTransformReply(fn, resp); return async function (this: NamespaceProxyClient) { const fnArgs = fn.transformArguments.apply(undefined, arguments as any), - reply = await this.self._sendCommand( + reply = await this.self.sendCommand( prefix.concat(fnArgs), this.self.commandOptions ); @@ -187,12 +178,12 @@ export default class RedisClient< return async function (this: ProxyClient) { const scriptArgs = script.transformArguments.apply(undefined, arguments as any), args = prefix.concat(scriptArgs), - reply = await this._sendCommand(args, this.commandOptions).catch((err: unknown) => { + reply = await this.sendCommand(args, this.commandOptions).catch((err: unknown) => { if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err; args[0] = 'EVAL'; args[1] = script.SCRIPT; - return this._sendCommand(args, this.commandOptions); + return this.sendCommand(args, this.commandOptions); }); return transformReply ? transformReply(reply, scriptArgs.preserve) : @@ -230,7 +221,7 @@ export default class RedisClient< F extends RedisFunctions = {}, S extends RedisScripts = {}, RESP extends RespVersions = 2 - >(options?: RedisClientOptions) { + >(this: void, options?: RedisClientOptions) { return RedisClient.factory(options)(options); } @@ -278,8 +269,6 @@ export default class RedisClient< private readonly _options?: RedisClientOptions; private readonly _socket: RedisSocket; private readonly _queue: RedisCommandsQueue; - private _isolationPool?: Pool>; - // readonly #v4: Record = {}; private _selectedDB = 0; get options(): RedisClientOptions | undefined { @@ -298,20 +287,11 @@ export default class RedisClient< return this._queue.isPubSubActive; } - // get v4(): Record { - // if (!this.client.#options?.legacyMode) { - // throw new Error('the client is not in "legacy mode"'); - // } - - // return this.client.#v4; - // } - constructor(options?: RedisClientOptions) { super(); this._options = this._initiateOptions(options); this._queue = this._initiateQueue(); this._socket = this._initiateSocket(); - // this.#legacyMode(); } private _initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { @@ -353,12 +333,12 @@ export default class RedisClient< } if (this._options?.readonly) { - // promises.push( - // this.#queue.addCommand( - // COMMANDS.READONLY.transformArguments(), - // { asap: true } - // ) - // ); + promises.push( + this._queue.addCommand( + COMMANDS.READONLY.transformArguments(), + { asap: true } + ) + ); } if (this._options?.RESP) { @@ -383,24 +363,24 @@ export default class RedisClient< ); } else { if (this._options?.name) { - // promises.push( - // this.#queue.addCommand( - // COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name), - // { asap: true } - // ) - // ); + promises.push( + this._queue.addCommand( + COMMANDS.CLIENT_SETNAME.transformArguments(this._options.name), + { asap: true } + ) + ); } if (this._options?.username || this._options?.password) { - // promises.push( - // this.#queue.addCommand( - // COMMANDS.AUTH.transformArguments({ - // username: this.#options.username, - // password: this.#options.password ?? '' - // }), - // { asap: true } - // ) - // ); + promises.push( + this._queue.addCommand( + COMMANDS.AUTH.transformArguments({ + username: this._options.username, + password: this._options.password ?? '' + }), + { asap: true } + ) + ); } } @@ -436,66 +416,6 @@ export default class RedisClient< .on('end', () => this.emit('end')); } - // #legacyMode(): void { - // if (!this.#options?.legacyMode) return; - - // (this as any).#v4.sendCommand = this.#sendCommand.bind(this); - // (this as any).sendCommand = (...args: Array): void => { - // const result = this.#legacySendCommand(...args); - // if (result) { - // result.promise - // .then(reply => result.callback(null, reply)) - // .catch(err => result.callback(err)); - // } - // }; - - // for (const [name, command] of Object.entries(COMMANDS)) { - // this.#defineLegacyCommand(name, command); - // (this as any)[name.toLowerCase()] ??= (this as any)[name]; - // } - - // // hard coded commands - // this.#defineLegacyCommand('SELECT'); - // this.#defineLegacyCommand('select'); - // this.#defineLegacyCommand('SUBSCRIBE'); - // this.#defineLegacyCommand('subscribe'); - // this.#defineLegacyCommand('PSUBSCRIBE'); - // this.#defineLegacyCommand('pSubscribe'); - // this.#defineLegacyCommand('UNSUBSCRIBE'); - // this.#defineLegacyCommand('unsubscribe'); - // this.#defineLegacyCommand('PUNSUBSCRIBE'); - // this.#defineLegacyCommand('pUnsubscribe'); - // this.#defineLegacyCommand('QUIT'); - // this.#defineLegacyCommand('quit'); - // } - - // #legacySendCommand(...args: Array) { - // const callback = typeof args[args.length - 1] === 'function' ? - // args.pop() as ClientLegacyCallback : - // undefined; - - // const promise = this.#sendCommand(transformLegacyCommandArguments(args)); - // if (callback) return { - // promise, - // callback - // }; - // promise.catch(err => this.emit('error', err)); - // } - - // #defineLegacyCommand(name: string, command?: RedisCommand): void { - // this.#v4[name] = (this as any)[name].bind(this); - // (this as any)[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ? - // (...args: Array) => { - // const result = this.#legacySendCommand(name, ...args); - // if (result) { - // result.promise - // .then(reply => result.callback(null, command.transformReply!(reply))) - // .catch(err => result.callback(err)); - // } - // } : - // (...args: Array) => (this as any).sendCommand(name, ...args); - // } - private _pingTimer?: NodeJS.Timer; private _setPingTimer(): void { @@ -505,8 +425,7 @@ export default class RedisClient< this._pingTimer = setTimeout(() => { if (!this._socket.isReady) return; - // using _sendCommand to support legacy mode - this._sendCommand(['PING']) + this.sendCommand(['PING']) .then(reply => this.emit('ping-interval', reply)) .catch(err => this.emit('error', err)) .finally(() => this._setPingTimer()); @@ -559,10 +478,21 @@ export default class RedisClient< } /** - * Override the `isolated` command option to `true` + * Get the "legacy" (v3/callback) interface */ - isolated() { - return this._commandOptionsProxy('isolated', true); + legacy(): RedisLegacyClientType { + return new RedisLegacyClient( + this as unknown as RedisClientType + ) as RedisLegacyClientType; + } + + /** + * Create `RedisClientPool` using this client as a prototype + */ + pool() { + return RedisClientPool.fromClient( + this as unknown as RedisClientType + ); } duplicate(overrides?: Partial>) { @@ -572,36 +502,16 @@ export default class RedisClient< }) as RedisClientType; } - async connect(): Promise { - await this._socket.connect(); - this.self._isolationPool = createPool({ - create: async () => { - const duplicate = this.duplicate({ - isolationPoolOptions: undefined - }).on('error', err => this.emit('error', err)); - await duplicate.connect(); - return duplicate; - }, - destroy: client => client.disconnect() - }, this._options?.isolationPoolOptions); + connect() { + return this._socket.connect(); } - sendCommand = this._sendCommand.bind(this); - - // using `_` to avoid conflicts with the legacy mode - _sendCommand( + sendCommand( args: CommandArguments, options?: ClientCommandOptions ): Promise { if (!this._socket.isOpen) { return Promise.reject(new ClientClosedError()); - } else if (options?.isolated) { - return this.executeIsolated(isolatedClient => - isolatedClient.sendCommand(args, { - ...options, - isolated: false - }) - ); } else if (!this._socket.isReady && this._options?.disableOfflineQueue) { return Promise.reject(new ClientOfflineError()); } @@ -612,7 +522,7 @@ export default class RedisClient< } async SELECT(db: number): Promise { - await this._sendCommand(['SELECT', db.toString()]); + await this.sendCommand(['SELECT', db.toString()]); this._selectedDB = db; } @@ -630,7 +540,6 @@ export default class RedisClient< listener: PubSubListener, bufferMode?: T ): Promise { - console.log('SUBSCRIBE', channels, listener, bufferMode, this._options?.RESP); return this._pubSubCommand( this._queue.subscribe( PubSubType.CHANNELS, @@ -658,7 +567,7 @@ export default class RedisClient< ); } - unsubscribe = this.UNSUBSCRIBE; + unsubscribe = this.UNSUBSCRIBE PSUBSCRIBE( patterns: string | Array, @@ -752,17 +661,13 @@ export default class RedisClient< return this._socket.quit(async () => { const quitPromise = this._queue.addCommand(['QUIT']); this._tick(); - const [reply] = await Promise.all([ - quitPromise, - this._destroyIsolationPool() - ]); - return reply; + return quitPromise; }); } quit = this.QUIT; - _tick(force = false): void { + private _tick(force = false): void { if (this._socket.writableNeedDrain || (!force && !this._socket.isReady)) { return; } @@ -777,12 +682,6 @@ export default class RedisClient< } } - executeIsolated(fn: (client: RedisClientType) => T | Promise): Promise { - return this._isolationPool ? - this._isolationPool.use(fn) : - Promise.reject(new ClientClosedError()); - } - private _addMultiCommands( commands: Array, chainId?: symbol, @@ -827,7 +726,6 @@ export default class RedisClient< return results; } - // self.#options?.legacyMode ); } @@ -884,23 +782,16 @@ export default class RedisClient< } while (cursor !== 0); } - async disconnect(): Promise { + disconnect() { this._queue.flushAll(new DisconnectsClientError()); this._socket.disconnect(); - await this._destroyIsolationPool(); } - private async _destroyIsolationPool(): Promise { - await this._isolationPool!.drain(); - await this._isolationPool!.clear(); - this.self._isolationPool = undefined; - } - - ref(): void { + ref() { this._socket.ref(); } - unref(): void { + unref() { this._socket.unref(); } } diff --git a/packages/client/lib/client/legacy-mode.ts b/packages/client/lib/client/legacy-mode.ts new file mode 100644 index 0000000000..c189c85342 --- /dev/null +++ b/packages/client/lib/client/legacy-mode.ts @@ -0,0 +1,107 @@ +import { RedisModules, RedisFunctions, RedisScripts, RespVersions, Flags, Command, CommandArguments, ReplyUnion } from '../RESP/types'; +import { RedisClientType } from '.'; +import { getTransformReply } from '../commander'; +import { ErrorReply } from '../errors'; +import COMMANDS from '../commands'; + +type LegacyArgument = string | Buffer | number | Date; + +type LegacyArguments = Array; + +type LegacyCallback = (err: ErrorReply | null, reply?: ReplyUnion) => unknown + +type LegacyCommandArguments = LegacyArguments | [ + ...args: LegacyArguments, + callback: LegacyCallback +]; + +export type CommandSignature = (...args: LegacyCommandArguments) => void; + +type WithCommands = { + [P in keyof typeof COMMANDS]: CommandSignature; +}; + +export type RedisLegacyClientType = RedisLegacyClient & WithCommands; + +export class RedisLegacyClient { + private static _transformArguments(redisArgs: CommandArguments, args: LegacyCommandArguments) { + let callback: LegacyCallback | undefined; + if (typeof args[args.length - 1] === 'function') { + callback = args.pop() as LegacyCallback; + } + + RedisLegacyClient._pushArguments(redisArgs, args as LegacyArguments); + + return callback; + } + + private static _pushArguments(redisArgs: CommandArguments, args: LegacyArguments) { + for (let i = 0; i < args.length; ++i) { + const arg = args[i]; + if (Array.isArray(arg)) { + RedisLegacyClient._pushArguments(redisArgs, arg); + } else { + redisArgs.push( + typeof arg === 'number' || arg instanceof Date ? + arg.toString() : + arg + ); + } + } + } + + private static _getTransformReply(command: Command, resp: RespVersions) { + return command.TRANSFORM_LEGACY_REPLY ? + getTransformReply(command, resp) : + undefined; + } + + private static _createCommand(name: string, command: Command, resp: RespVersions) { + const transformReply = RedisLegacyClient._getTransformReply(command, resp); + return async function (this: RedisLegacyClient, ...args: LegacyCommandArguments) { + const redisArgs = [name], + callback = RedisLegacyClient._transformArguments(redisArgs, args), + promise = this._client.sendCommand(redisArgs); + + if (!callback) { + promise.catch(err => this._client.emit('error', err)); + return; + } + + promise + .then(reply => callback(null, transformReply ? transformReply(reply) : reply)) + .catch(err => callback(err)); + }; + } + + constructor( + private _client: RedisClientType + ) { + const RESP = _client.options?.RESP ?? 2; + for (const [name, command] of Object.entries(COMMANDS)) { + // TODO: as any? + (this as any)[name] = RedisLegacyClient._createCommand( + name, + command, + RESP + ); + } + + // TODO: Multi + } + + sendCommand(...args: LegacyArguments) { + const redisArgs: CommandArguments = [], + callback = RedisLegacyClient._transformArguments(redisArgs, args), + promise = this._client.sendCommand(redisArgs); + + if (!callback) { + promise.catch(err => this._client.emit('error', err)); + return; + } + + promise + .then(reply => callback(null, reply)) + .catch(err => callback(err)); + } +} diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts new file mode 100644 index 0000000000..36c4387a3e --- /dev/null +++ b/packages/client/lib/client/pool.ts @@ -0,0 +1,84 @@ +import { Pool, Options as PoolOptions, createPool } from 'generic-pool'; +import { RedisFunctions, RedisModules, RedisScripts, RespVersions } from '../RESP/types'; +import RedisClient, { RedisClientType, RedisClientOptions } from '.'; +import { EventEmitter } from 'events'; + +type RedisClientPoolOptions< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions +> = RedisClientOptions & PoolOptions; + +export class RedisClientPool< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions +> extends EventEmitter { + _pool: Pool>; + + static fromClient< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions + >( + client: RedisClientType, + poolOptions?: PoolOptions + ) { + return new RedisClientPool( + () => client.duplicate(), + poolOptions + ); + } + + static fromOptions< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions + >( + options: RedisClientPoolOptions, + poolOptions?: PoolOptions + ) { + return new RedisClientPool( + RedisClient.factory(options), + poolOptions + ); + } + + constructor( + clientFactory: () => RedisClientType, + options?: PoolOptions + ) { + super(); + + this._pool = createPool({ + create: async () => { + const client = clientFactory(); + + // TODO: more events? + client.on('error', (err: Error) => this.emit('error', err)); + + await client.connect(); + + return client; + }, + // TODO: destroy has to return a Promise?! + destroy: async client => client.disconnect() + }, options); + } + + execute(fn: () => T): Promise { + return this._pool.use(fn); + } + + close() { + // TODO + } + + disconnect() { + // TODO + } +} diff --git a/packages/client/lib/commands/READONLY.spec.ts b/packages/client/lib/commands/READONLY.spec.ts index aa4db47f81..e635d04009 100644 --- a/packages/client/lib/commands/READONLY.spec.ts +++ b/packages/client/lib/commands/READONLY.spec.ts @@ -2,10 +2,10 @@ import { strict as assert } from 'assert'; import { transformArguments } from './READONLY'; describe('READONLY', () => { - it('transformArguments', () => { - assert.deepEqual( - transformArguments(), - ['READONLY'] - ); - }); + it('transformArguments', () => { + assert.deepEqual( + transformArguments(), + ['READONLY'] + ); + }); }); diff --git a/packages/client/lib/commands/SCAN.ts b/packages/client/lib/commands/SCAN.ts index 690f31c435..da64a7969e 100644 --- a/packages/client/lib/commands/SCAN.ts +++ b/packages/client/lib/commands/SCAN.ts @@ -1,4 +1,4 @@ -import { CommandArguments, RedisArgument, BlobStringReply, ArrayReply, Command } from '../RESP/types'; +import { RedisArgument, CommandArguments, BlobStringReply, ArrayReply, Command } from '../RESP/types'; export interface ScanCommonOptions { MATCH?: string; diff --git a/packages/client/lib/commands/index.ts b/packages/client/lib/commands/index.ts index ec2c632915..68f29c3751 100644 --- a/packages/client/lib/commands/index.ts +++ b/packages/client/lib/commands/index.ts @@ -133,6 +133,7 @@ import PING from './PING'; import PSETEX from './PSETEX'; import PTTL from './PTTL'; import RANDOMKEY from './RANDOMKEY'; +import READONLY from './READONLY'; import RENAME from './RENAME'; import RENAMENX from './RENAMENX'; import RPOP_COUNT from './RPOP_COUNT'; @@ -487,6 +488,8 @@ export default { pTTL: PTTL, RANDOMKEY, randomKey: RANDOMKEY, + READONLY, + readonly: READONLY, RENAME, rename: RENAME, RENAMENX,