diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index c098eb7208..debfcec163 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -689,38 +689,71 @@ export default class RedisClient< ); } - MULTI(): RedisClientMultiCommandType<[], M, F, S, RESP, FLAGS> { - return new (this as any).Multi( - async ( + /** + * @internal + */ + async executePipeline(commands: Array) { + if (!this._socket.isOpen) { + return Promise.reject(new ClientClosedError()); + } + + const promise = Promise.all( + commands.map(({ args }) => this._queue.addCommand(args, { + flags: (this as ProxyClient).commandOptions?.flags + })) + ); + this._tick(); + return promise; + } + + /** + * @internal + */ + async executeMulti( commands: Array, - selectedDB?: number, - chainId?: symbol - ) => { + selectedDB?: number + ) { if (!this._socket.isOpen) { return Promise.reject(new ClientClosedError()); } const flags = (this as ProxyClient).commandOptions?.flags, - promise = chainId ? - // if `chainId` has a value, it's a `MULTI` (and not "pipeline") - need to add the `MULTI` and `EXEC` commands - Promise.all([ + chainId = Symbol('MULTI Chain'), + promises = [ this._queue.addCommand(['MULTI'], { chainId }), - this._addMultiCommands(commands, chainId), - this._queue.addCommand(['EXEC'], { chainId, flags }) - ]) : - this._addMultiCommands(commands, undefined, flags); + ]; + + for (const { args } of commands) { + promises.push( + this._queue.addCommand(args, { + chainId, + flags + }) + ); + } + + promises.push( + this._queue.addCommand(['EXEC'], { chainId }) + ); this._tick(); - const results = await promise; + const results = await Promise.all(promises), + execResult = results[results.length - 1]; + + if (execResult === null) { + throw new WatchError(); + } if (selectedDB !== undefined) { this._selectedDB = selectedDB; } - return results; + return execResult as Array; } - ); + + MULTI(): RedisClientMultiCommandType<[], M, F, S, RESP, FLAGS> { + return new (this as any).Multi(this); } multi = this.MULTI; diff --git a/packages/client/lib/client/legacy-mode.ts b/packages/client/lib/client/legacy-mode.ts index c189c85342..87de34bd7e 100644 --- a/packages/client/lib/client/legacy-mode.ts +++ b/packages/client/lib/client/legacy-mode.ts @@ -3,6 +3,7 @@ import { RedisClientType } from '.'; import { getTransformReply } from '../commander'; import { ErrorReply } from '../errors'; import COMMANDS from '../commands'; +import RedisMultiCommand from '../multi-command'; type LegacyArgument = string | Buffer | number | Date; @@ -15,10 +16,8 @@ type LegacyCommandArguments = LegacyArguments | [ callback: LegacyCallback ]; -export type CommandSignature = (...args: LegacyCommandArguments) => void; - type WithCommands = { - [P in keyof typeof COMMANDS]: CommandSignature; + [P in keyof typeof COMMANDS]: (...args: LegacyCommandArguments) => void; }; export type RedisLegacyClientType = RedisLegacyClient & WithCommands; @@ -30,16 +29,16 @@ export class RedisLegacyClient { callback = args.pop() as LegacyCallback; } - RedisLegacyClient._pushArguments(redisArgs, args as LegacyArguments); + RedisLegacyClient.pushArguments(redisArgs, args as LegacyArguments); return callback; } - private static _pushArguments(redisArgs: CommandArguments, args: LegacyArguments) { + static pushArguments(redisArgs: CommandArguments, args: LegacyArguments) { for (let i = 0; i < args.length; ++i) { const arg = args[i]; if (Array.isArray(arg)) { - RedisLegacyClient._pushArguments(redisArgs, arg); + RedisLegacyClient.pushArguments(redisArgs, arg); } else { redisArgs.push( typeof arg === 'number' || arg instanceof Date ? @@ -50,14 +49,14 @@ export class RedisLegacyClient { } } - private static _getTransformReply(command: Command, resp: RespVersions) { + static getTransformReply(command: Command, resp: RespVersions) { return command.TRANSFORM_LEGACY_REPLY ? getTransformReply(command, resp) : undefined; } private static _createCommand(name: string, command: Command, resp: RespVersions) { - const transformReply = RedisLegacyClient._getTransformReply(command, resp); + const transformReply = RedisLegacyClient.getTransformReply(command, resp); return async function (this: RedisLegacyClient, ...args: LegacyCommandArguments) { const redisArgs = [name], callback = RedisLegacyClient._transformArguments(redisArgs, args), @@ -74,6 +73,8 @@ export class RedisLegacyClient { }; } + private _Multi: ReturnType; + constructor( private _client: RedisClientType ) { @@ -87,7 +88,7 @@ export class RedisLegacyClient { ); } - // TODO: Multi + this._Multi = LegacyMultiCommand.factory(RESP); } sendCommand(...args: LegacyArguments) { @@ -104,4 +105,68 @@ export class RedisLegacyClient { .then(reply => callback(null, reply)) .catch(err => callback(err)); } + + multi() { + return this._Multi(this._client); + } +} + +type MultiWithCommands = { + [P in keyof typeof COMMANDS]: (...args: LegacyCommandArguments) => RedisLegacyMultiType; +}; + +export type RedisLegacyMultiType = Omit & MultiWithCommands; + +class LegacyMultiCommand extends RedisMultiCommand { + private static _createCommand(name: string, command: Command, resp: RespVersions) { + const transformReply = RedisLegacyClient.getTransformReply(command, resp); + return function (this: LegacyMultiCommand, ...args: LegacyArguments) { + const redisArgs = [name]; + RedisLegacyClient.pushArguments(redisArgs, args); + return this.addCommand(redisArgs, transformReply); + }; + } + + static factory(resp: RespVersions) { + const Multi = class extends LegacyMultiCommand {}; + + for (const [name, command] of Object.entries(COMMANDS)) { + // TODO: as any? + (Multi as any).prototype[name] = LegacyMultiCommand._createCommand( + name, + command, + resp + ); + } + + return (client: RedisClientType) => { + return new Multi(client) as unknown as RedisLegacyMultiType; + }; + } + + private _client: RedisClientType; + + constructor(client: RedisClientType) { + super(); + this._client = client; + } + + sendCommand(...args: LegacyArguments) { + const redisArgs: CommandArguments = []; + RedisLegacyClient.pushArguments(redisArgs, args); + return this.addCommand(redisArgs); + } + + exec(cb?: (err: ErrorReply | null, replies?: Array) => unknown) { + const promise = this._client.executeMulti(this.queue); + + if (!cb) { + promise.catch(err => this._client.emit('error', err)); + return; + } + + promise + .then(results => cb(null, this.transformReplies(results))) + .catch(err => cb?.(err)); + } } diff --git a/packages/client/lib/client/linked-list.spec.ts b/packages/client/lib/client/linked-list.spec.ts index c5ab379b82..9547fb81c7 100644 --- a/packages/client/lib/client/linked-list.spec.ts +++ b/packages/client/lib/client/linked-list.spec.ts @@ -1,7 +1,7 @@ import { SinglyLinkedList, DoublyLinkedList } from './linked-list'; import { equal, deepEqual } from 'assert/strict'; -describe.only('DoublyLinkedList', () => { +describe('DoublyLinkedList', () => { const list = new DoublyLinkedList(); it('should start empty', () => { @@ -78,7 +78,7 @@ describe.only('DoublyLinkedList', () => { }); }); -describe.only('SinglyLinkedList', () => { +describe('SinglyLinkedList', () => { const list = new SinglyLinkedList(); it('should start empty', () => { diff --git a/packages/client/lib/client/multi-command.ts b/packages/client/lib/client/multi-command.ts index 3503a34af2..90a5c7ccf9 100644 --- a/packages/client/lib/client/multi-command.ts +++ b/packages/client/lib/client/multi-command.ts @@ -2,6 +2,7 @@ import COMMANDS from '../commands'; import RedisMultiCommand, { RedisMultiQueuedCommand } from '../multi-command'; import { ReplyWithFlags, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, Flags, ReplyUnion } from '../RESP/types'; import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander'; +import { RedisClientType } from '.'; type CommandSignature< REPLIES extends Array, @@ -90,7 +91,7 @@ type MULTI_REPLY = { type MultiReply = MULTI_REPLY[keyof MULTI_REPLY]; -type ReplyType = T extends MULTI_REPLY['TYPED'] ? REPLIES : Array; +type ReplyType = T extends MULTI_REPLY['TYPED'] ? REPLIES : Array; export type RedisClientMultiExecutor = ( queue: Array, @@ -161,62 +162,14 @@ export default class RedisClientMultiCommand extends RedisMultiCom }); } - // readonly #multi = new RedisMultiCommand(); - readonly #executor: RedisClientMultiExecutor; - // readonly v4: Record = {}; + readonly #client: RedisClientType; #selectedDB?: number; - constructor(executor: RedisClientMultiExecutor, legacyMode = false) { + constructor(client: RedisClientType) { super(); - this.#executor = executor; - // if (legacyMode) { - // this.#legacyMode(); - // } + this.#client = client; } - // #legacyMode(): void { - // this.v4.addCommand = this.addCommand.bind(this); - // (this as any).addCommand = (...args: Array): this => { - // this.#multi.addCommand(transformLegacyCommandArguments(args)); - // return this; - // }; - // this.v4.exec = this.exec.bind(this); - // (this as any).exec = (callback?: (err: Error | null, replies?: Array) => unknown): void => { - // this.v4.exec() - // .then((reply: Array) => { - // if (!callback) return; - - // callback(null, reply); - // }) - // .catch((err: Error) => { - // if (!callback) { - // // this.emit('error', err); - // return; - // } - - // callback(err); - // }); - // }; - - // for (const [name, command] of Object.entries(COMMANDS as RedisCommands)) { - // this.#defineLegacyCommand(name, command); - // (this as any)[name.toLowerCase()] ??= (this as any)[name]; - // } - // } - - // #defineLegacyCommand(this: any, name: string, command?: RedisCommand): void { - // this.v4[name] = this[name].bind(this.v4); - // this[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ? - // (...args: Array) => { - // this.#multi.addCommand( - // [name, ...transformLegacyCommandArguments(args)], - // command.transformReply - // ); - // return this; - // } : - // (...args: Array) => this.addCommand(name, ...args); - // } - SELECT(db: number, transformReply?: TransformReply): this { this.#selectedDB = db; return this.addCommand(['SELECT', db.toString()], transformReply); @@ -224,15 +177,11 @@ export default class RedisClientMultiCommand extends RedisMultiCom select = this.SELECT; - async exec(execAsPipeline = false) { + async exec(execAsPipeline = false): Promise> { if (execAsPipeline) return this.execAsPipeline(); - return this.handleExecReplies( - await this.#executor( - this.queue, - this.#selectedDB, - RedisMultiCommand.generateChainId() - ) + return this.transformReplies( + await this.#client.executeMulti(this.queue, this.#selectedDB) ) as ReplyType; } @@ -242,14 +191,11 @@ export default class RedisClientMultiCommand extends RedisMultiCom return this.exec(execAsPipeline); } - async execAsPipeline() { + async execAsPipeline(): Promise> { if (this.queue.length === 0) return [] as ReplyType; return this.transformReplies( - await this.#executor( - this.queue, - this.#selectedDB - ) + await this.#client.executePipeline(this.queue) ) as ReplyType; } diff --git a/packages/client/lib/multi-command.ts b/packages/client/lib/multi-command.ts index 2c3f7f7ee7..65570ff33a 100644 --- a/packages/client/lib/multi-command.ts +++ b/packages/client/lib/multi-command.ts @@ -1,5 +1,4 @@ -import { Command, CommandArguments, RedisScript, TransformReply } from './RESP/types'; -import { WatchError } from './errors'; +import { CommandArguments, RedisScript, TransformReply } from './RESP/types'; export interface RedisMultiQueuedCommand { args: CommandArguments; @@ -7,10 +6,6 @@ export interface RedisMultiQueuedCommand { } export default class RedisMultiCommand { - static generateChainId(): symbol { - return Symbol('RedisMultiCommand Chain Id'); - } - readonly queue: Array = []; readonly scriptsInUse = new Set(); @@ -42,15 +37,6 @@ export default class RedisMultiCommand { return this.addCommand(redisArgs, transformReply); } - handleExecReplies(rawReplies: Array): Array { - const execReply = rawReplies[rawReplies.length - 1] as (null | Array); - if (execReply === null) { - throw new WatchError(); - } - - return this.transformReplies(execReply); - } - transformReplies(rawReplies: Array): Array { return rawReplies.map((reply, i) => { const { transformReply, args } = this.queue[i];