From 91be6ac9d424a566ae95afc82b08474e4895ae86 Mon Sep 17 00:00:00 2001 From: Leibale Date: Wed, 14 Jun 2023 15:04:36 -0400 Subject: [PATCH] wip --- packages/client/lib/RESP/decoder.ts | 2 +- packages/client/lib/client/commands-queue.ts | 50 ++++--- packages/client/lib/client/index.ts | 149 ++++++++++--------- packages/client/lib/client/multi-command.ts | 11 +- packages/client/lib/client/socket.ts | 42 ++---- packages/client/lib/cluster/index.ts | 5 +- 6 files changed, 131 insertions(+), 128 deletions(-) diff --git a/packages/client/lib/RESP/decoder.ts b/packages/client/lib/RESP/decoder.ts index af312788b9..c4e1296fc2 100644 --- a/packages/client/lib/RESP/decoder.ts +++ b/packages/client/lib/RESP/decoder.ts @@ -49,7 +49,7 @@ interface DecoderOptions { } export class Decoder { - private _config; + private readonly _config; private _cursor = 0; diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 57b7999dfa..9628097c28 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -6,7 +6,7 @@ import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, Pu import { AbortError, ErrorReply } from '../errors'; import { EventEmitter } from 'stream'; -export interface QueueCommandOptions { +export interface CommandOptions { chainId?: symbol; asap?: boolean; abortSignal?: AbortSignal; @@ -149,7 +149,7 @@ export default class RedisCommandsQueue { }); } - addCommand(args: CommandArguments, options?: QueueCommandOptions): Promise { + addCommand(args: CommandArguments, options?: CommandOptions): Promise { if (this._maxLength && this._waitingToBeSent.length + this._waitingForReply.length >= this._maxLength) { return Promise.reject(new Error('The queue is full')); } else if (options?.abortSignal?.aborted) { @@ -256,30 +256,32 @@ export default class RedisCommandsQueue { }); } - getCommandToSend(): CommandArguments | undefined { - const toSend = this._waitingToBeSent.shift(); - if (!toSend) return; + *waitingToBeSent() { + let toSend = this._waitingToBeSent.shift(); + while (toSend) { + let encoded: CommandArguments; + try { + encoded = encodeCommand(toSend.args); + } catch (err) { + toSend.reject(err); + toSend = this._waitingToBeSent.shift(); + continue; + } - let encoded: CommandArguments; - try { - encoded = encodeCommand(toSend.args); - } catch (err) { - toSend.reject(err); - return; + if (toSend.abort) { + RedisCommandsQueue._removeAbortListener(toSend); + toSend.abort = undefined; + } + + // TODO reuse `toSend` or create new object? + (toSend as any).args = undefined; + (toSend as any).chainId = undefined; + + this._waitingForReply.push(toSend); + this._chainInExecution = toSend.chainId; + yield encoded; + toSend = this._waitingToBeSent.shift(); } - - if (toSend.abort) { - RedisCommandsQueue._removeAbortListener(toSend); - toSend.abort = undefined; - } - - // TODO reuse `toSend` or create new object? - (toSend as any).args = undefined; - (toSend as any).chainId = undefined; - - this._waitingForReply.push(toSend); - this._chainInExecution = toSend.chainId; - return encoded; } private _flushWaitingForReply(err: Error): void { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 2b256e41e0..4e6700d3fb 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -1,6 +1,6 @@ import COMMANDS from '../commands'; import RedisSocket, { RedisSocketOptions, RedisTlsSocketOptions } from './socket'; -import RedisCommandsQueue, { QueueCommandOptions } from './commands-queue'; +import RedisCommandsQueue, { CommandOptions } from './commands-queue'; import { EventEmitter } from 'events'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchError } from '../errors'; @@ -12,7 +12,7 @@ import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-co import { RedisMultiQueuedCommand } from '../multi-command'; import HELLO, { HelloOptions } from '../commands/HELLO'; import { ReplyWithTypeMapping, CommandReply } from '../RESP/types'; -import SCAN, { ScanOptions, ScanCommonOptions } from '../commands/SCAN'; +import { ScanOptions, ScanCommonOptions } from '../commands/SCAN'; import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; // import { RedisClientPool } from './pool'; @@ -20,8 +20,9 @@ export interface RedisClientOptions< M extends RedisModules = RedisModules, F extends RedisFunctions = RedisFunctions, S extends RedisScripts = RedisScripts, - RESP extends RespVersions = RespVersions -> extends CommanderConfig { + RESP extends RespVersions = RespVersions, + TYPE_MAPPING extends TypeMapping = TypeMapping +> extends CommanderConfig, TypeMappingOption { /** * `redis[s]://[[username][:password]@][host][:port][/db-number]` * See [`redis`](https://www.iana.org/assignments/uri-schemes/prov/redis) and [`rediss`](https://www.iana.org/assignments/uri-schemes/prov/rediss) IANA registration for more details @@ -67,40 +68,60 @@ export interface RedisClientOptions< pingInterval?: number; } +interface TypeMappingOption { + /** + * Maps bettwen RESP types to JavaScript types + */ + typeMapping?: TYPE_MAPPING; +} + type WithCommands< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING>; - }; + [P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING>; +}; type WithModules< M extends RedisModules, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof M]: { - [C in keyof M[P]]: CommandSignature; - }; + [P in keyof M]: { + [C in keyof M[P]]: CommandSignature; }; +}; type WithFunctions< F extends RedisFunctions, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [L in keyof F]: { - [C in keyof F[L]]: CommandSignature; - }; + [L in keyof F]: { + [C in keyof F[L]]: CommandSignature; }; +}; type WithScripts< S extends RedisScripts, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof S]: CommandSignature; - }; + [P in keyof S]: CommandSignature; +}; + +export type RedisClientExtensions< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} +> = ( + WithCommands & + WithModules & + WithFunctions & + WithScripts +); export type RedisClientType< M extends RedisModules = {}, @@ -109,18 +130,11 @@ export type RedisClientType< RESP extends RespVersions = 2, TYPE_MAPPING extends TypeMapping = {} > = ( - RedisClient & - WithCommands & - WithModules & - WithFunctions & - WithScripts - ); + RedisClient & + RedisClientExtensions +); -export interface ClientCommandOptions extends QueueCommandOptions { - // isolated?: boolean; -} - -type ProxyClient = RedisClient<{}, {}, {}, RespVersions, TypeMapping> & { commandOptions?: ClientCommandOptions }; +type ProxyClient = RedisClient & { commandOptions?: CommandOptions }; type NamespaceProxyClient = { self: ProxyClient }; @@ -181,8 +195,8 @@ export default class RedisClient< reply = await this.sendCommand(redisArgs, this.commandOptions).catch((err: unknown) => { if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err; - args[0] = 'EVAL'; - args[1] = script.SCRIPT; + redisArgs[0] = 'EVAL'; + redisArgs[1] = script.SCRIPT; return this.sendCommand(redisArgs, this.commandOptions); }); return transformReply ? @@ -195,8 +209,9 @@ export default class RedisClient< M extends RedisModules = {}, F extends RedisFunctions = {}, S extends RedisScripts = {}, - RESP extends RespVersions = 2 - >(config?: CommanderConfig) { + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} + >(config?: CommanderConfig & TypeMappingOption) { const Client = attachConfig({ BaseClass: RedisClient, commands: COMMANDS, @@ -212,7 +227,7 @@ export default class RedisClient< return (options?: Omit>) => { // returning a proxy of the client to prevent the namespaces.self to leak between proxies // namespaces will be bootstraped on first access per proxy - return Object.create(new Client(options)) as RedisClientType; + return Object.create(new Client(options)) as RedisClientType; }; } @@ -220,8 +235,9 @@ export default class RedisClient< M extends RedisModules = {}, F extends RedisFunctions = {}, S extends RedisScripts = {}, - RESP extends RespVersions = 2 - >(this: void, options?: RedisClientOptions) { + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} + >(this: void, options?: RedisClientOptions) { return RedisClient.factory(options)(options); } @@ -308,6 +324,12 @@ export default class RedisClient< this._selectedDB = options.database; } + if (options?.typeMapping) { + (this as unknown as ProxyClient).commandOptions = { + typeMapping: options.typeMapping + }; + } + return options; } @@ -390,7 +412,7 @@ export default class RedisClient< } if (promises.length) { - this._tick(true); + this._write(); await Promise.all(promises); } }; @@ -417,10 +439,10 @@ export default class RedisClient< .on('ready', () => { this.emit('ready'); this._setPingTimer(); - this._tick(); + this._scheduleWrite(); }) .on('reconnecting', () => this.emit('reconnecting')) - .on('drain', () => this._tick()) + .on('drain', () => this._scheduleWrite()) .on('end', () => this.emit('end')); } @@ -440,7 +462,7 @@ export default class RedisClient< }, this._options.pingInterval); } - withCommandOptions(options: T) { + withCommandOptions(options: T) { const proxy = Object.create(this.self); proxy.commandOptions = options; return proxy as RedisClientType< @@ -453,8 +475,8 @@ export default class RedisClient< } private _commandOptionsProxy< - K extends keyof ClientCommandOptions, - V extends ClientCommandOptions[K] + K extends keyof CommandOptions, + V extends CommandOptions[K] >( key: K, value: V @@ -523,7 +545,7 @@ export default class RedisClient< sendCommand( args: CommandArguments, - options?: ClientCommandOptions + options?: CommandOptions ): Promise { if (!this._socket.isOpen) { return Promise.reject(new ClientClosedError()); @@ -532,7 +554,7 @@ export default class RedisClient< } const promise = this._queue.addCommand(args, options); - this._tick(); + this._scheduleWrite(); return promise; } @@ -546,7 +568,7 @@ export default class RedisClient< private _pubSubCommand(promise: Promise | undefined) { if (promise === undefined) return Promise.resolve(); - this._tick(); + this._scheduleWrite(); return promise; } @@ -672,32 +694,19 @@ export default class RedisClient< ); } - private _tick(force = false): void { - if (this._socket.writableNeedDrain || (!force && !this._socket.isReady)) { - return; - } - - this._socket.cork(); - - do { - const args = this._queue.getCommandToSend(); - if (args === undefined) break; - - this._socket.writeCommand(args); - } while (!this._socket.writableNeedDrain); + private _write() { + this._socket.write(this._queue.waitingToBeSent()); } - private _addMultiCommands( - commands: Array, - chainId?: symbol, - typeMapping?: TypeMapping - ) { - return Promise.all( - commands.map(({ args }) => this._queue.addCommand(args, { - chainId, - typeMapping - })) - ); + private _scheduledWrite?: NodeJS.Immediate; + + private _scheduleWrite() { + if (!this.isReady || this._scheduledWrite) return; + + this._scheduledWrite = setImmediate(() => { + this._write(); + this._scheduledWrite = undefined; + }); } /** @@ -713,7 +722,7 @@ export default class RedisClient< typeMapping: (this as ProxyClient).commandOptions?.typeMapping })) ); - this._tick(); + this._scheduleWrite(); return promise; } @@ -747,7 +756,7 @@ export default class RedisClient< this._queue.addCommand(['EXEC'], { chainId }) ); - this._tick(); + this._scheduleWrite(); const results = await Promise.all(promises), execResult = results[results.length - 1]; @@ -772,7 +781,7 @@ export default class RedisClient< async* scanIterator( this: RedisClientType, options?: ScanOptions & ScanIteratorOptions - ): AsyncIterable['keys'], TYPE_MAPPING>> { + ) { let cursor = options?.cursor ?? 0; do { const reply = await this.scan(cursor, options); @@ -827,7 +836,7 @@ export default class RedisClient< return this._socket.quit(async () => { clearTimeout(this._pingTimer); const quitPromise = this._queue.addCommand(['QUIT']); - this._tick(); + this._scheduleWrite(); return quitPromise; }); } @@ -842,7 +851,7 @@ export default class RedisClient< } /** - * Close the client. Wait for pending replies. + * Close the client. Wait for pending commands. */ close() { return new Promise(resolve => { diff --git a/packages/client/lib/client/multi-command.ts b/packages/client/lib/client/multi-command.ts index 625e449da5..cacb5429a5 100644 --- a/packages/client/lib/client/multi-command.ts +++ b/packages/client/lib/client/multi-command.ts @@ -88,7 +88,7 @@ export default class RedisClientMultiCommand { private static _createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return function (this: RedisClientMultiCommand, ...args: Array) { - return this._multi.addCommand( + return this.addCommand( command.transformArguments(...args), transformReply ); @@ -98,7 +98,7 @@ export default class RedisClientMultiCommand { private static _createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return function (this: { self: RedisClientMultiCommand }, ...args: Array) { - return this.self._multi.addCommand( + return this.self.addCommand( command.transformArguments(...args), transformReply ); @@ -112,7 +112,7 @@ export default class RedisClientMultiCommand { const fnArgs = fn.transformArguments(...args), redisArgs: CommandArguments = prefix.concat(fnArgs); redisArgs.preserve = fnArgs.preserve; - return this.self._multi.addCommand( + return this.self.addCommand( redisArgs, transformReply ); @@ -164,6 +164,11 @@ export default class RedisClientMultiCommand { select = this.SELECT; + addCommand(args: CommandArguments, transformReply?: TransformReply) { + this._multi.addCommand(args, transformReply); + return this; + } + async exec(execAsPipeline = false): Promise> { if (execAsPipeline) return this.execAsPipeline(); diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 47c4616c19..e179613062 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -82,14 +82,6 @@ export default class RedisSocket extends EventEmitter { return this._isReady; } - // `writable.writableNeedDrain` was added in v15.2.0 and therefore can't be used - // https://nodejs.org/api/stream.html#stream_writable_writableneeddrain - private _writableNeedDrain = false; - - get writableNeedDrain(): boolean { - return this._writableNeedDrain; - } - private _isSocketUnrefed = false; constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { @@ -149,7 +141,6 @@ export default class RedisSocket extends EventEmitter { do { try { this._socket = await this._createSocket(); - this._writableNeedDrain = false; this.emit('connect'); try { @@ -203,10 +194,7 @@ export default class RedisSocket extends EventEmitter { this._onSocketError(new SocketClosedUnexpectedlyError()); } }) - .on('drain', () => { - this._writableNeedDrain = false; - this.emit('drain'); - }) + .on('drain', () => this.emit('drain')) .on('data', data => this.emit('data', data)); resolve(socket); @@ -240,14 +228,20 @@ export default class RedisSocket extends EventEmitter { }); } - writeCommand(args: Array): void { + write(iterator: IterableIterator>): void { if (!this._socket) { throw new ClientClosedError(); } + + this._socket.cork(); + for (const args of iterator) { + for (const toWrite of args) { + this._socket.write(toWrite); + } - for (const toWrite of args) { - this._writableNeedDrain = !this._socket.write(toWrite); + if (this._socket.writableNeedDrain) break; } + this._socket.uncork(); } async quit(fn: () => Promise): Promise { @@ -289,20 +283,12 @@ export default class RedisSocket extends EventEmitter { this.emit('end'); } - private _isCorked = false; - cork(): void { - if (!this._socket || this._isCorked) { - return; - } + this._socket?.cork(); + } - this._socket.cork(); - this._isCorked = true; - - setImmediate(() => { - this._socket?.uncork(); - this._isCorked = false; - }); + uncork(): void { + this._socket?.uncork(); } ref(): void { diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 4448e70a1a..17a3dc0904 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -1,4 +1,5 @@ -import { ClientCommandOptions, RedisClientOptions } from '../client'; +import { RedisClientOptions } from '../client'; +import { CommandOptions } from '../client/commands-queue'; import { Command, CommandArguments, CommanderConfig, CommandPolicies, CommandWithPoliciesSignature, TypeMapping, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions } from '../RESP/types'; import COMMANDS from '../commands'; import { EventEmitter } from 'events'; @@ -69,7 +70,7 @@ export type RedisClusterType< > = RedisCluster & WithCommands; // & WithModules & WithFunctions & WithScripts -export interface ClusterCommandOptions extends ClientCommandOptions { +export interface ClusterCommandOptions extends CommandOptions { policies?: CommandPolicies; }