From fcb3a011b5ce16de22003af67f2711e25b0c40ff Mon Sep 17 00:00:00 2001 From: Leibale Date: Tue, 30 May 2023 19:23:14 +0300 Subject: [PATCH] cluster multi --- docs/v4-to-v5.md | 7 + packages/client/lib/client/index.ts | 4 +- packages/client/lib/client/multi-command.ts | 97 +++--- packages/client/lib/cluster/index.ts | 115 +++--- packages/client/lib/cluster/multi-command.ts | 346 ++++++++++++------- packages/client/lib/multi-command.ts | 13 +- 6 files changed, 352 insertions(+), 230 deletions(-) diff --git a/docs/v4-to-v5.md b/docs/v4-to-v5.md index c81e2b29e3..332bd12dcf 100644 --- a/docs/v4-to-v5.md +++ b/docs/v4-to-v5.md @@ -71,6 +71,13 @@ legacyClient.set('key', 'value', (err, reply) => { TODO The `isolationPool` has been moved to it's on class `ClientPool`. You can create pool from a client using `client.createPool()`. +## Cluster MULTI + +Cluster MULTI supports readonly/replicas +`cluster.multi.addCommand` now requires `isReadonly` as the second argument, to match `cluster.sendCommand` + +TODO + ## Commands Some command arguments/replies have changed to align more closely to data types returned by Redis: diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index debfcec163..e9b6c4e2bf 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -3,7 +3,7 @@ import RedisSocket, { RedisSocketOptions, RedisTlsSocketOptions } from './socket import RedisCommandsQueue, { QueueCommandOptions } from './commands-queue'; import { EventEmitter } from 'events'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; -import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '../errors'; +import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchError } from '../errors'; import { URL } from 'url'; import { TcpSocketConnectOpts } from 'net'; import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub'; @@ -692,7 +692,7 @@ export default class RedisClient< /** * @internal */ - async executePipeline(commands: Array) { + executePipeline(commands: Array) { if (!this._socket.isOpen) { return Promise.reject(new ClientClosedError()); } diff --git a/packages/client/lib/client/multi-command.ts b/packages/client/lib/client/multi-command.ts index 90a5c7ccf9..b011a08916 100644 --- a/packages/client/lib/client/multi-command.ts +++ b/packages/client/lib/client/multi-command.ts @@ -1,5 +1,5 @@ import COMMANDS from '../commands'; -import RedisMultiCommand, { RedisMultiQueuedCommand } from '../multi-command'; +import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command'; import { ReplyWithFlags, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, Flags, ReplyUnion } from '../RESP/types'; import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander'; import { RedisClientType } from '.'; @@ -84,64 +84,50 @@ export type RedisClientMultiCommandType< WithScripts ); -type MULTI_REPLY = { - GENERIC: 'generic'; - TYPED: 'typed'; -}; - -type MultiReply = MULTI_REPLY[keyof MULTI_REPLY]; - -type ReplyType = T extends MULTI_REPLY['TYPED'] ? REPLIES : Array; - -export type RedisClientMultiExecutor = ( - queue: Array, - selectedDB?: number, - chainId?: symbol -) => Promise>; - -export default class RedisClientMultiCommand extends RedisMultiCommand { - static #createCommand(command: Command, resp: RespVersions) { +export default class RedisClientMultiCommand { + private static _createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); - return function (this: RedisClientMultiCommand) { - return this.addCommand( - command.transformArguments.apply(undefined, arguments as any), + return function (this: RedisClientMultiCommand, ...args: Array) { + return this._multi.addCommand( + command.transformArguments(...args), transformReply ); }; } - static #createModuleCommand(command: Command, resp: RespVersions) { + private static _createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); - return function (this: { self: RedisClientMultiCommand }) { - return this.self.addCommand( - command.transformArguments.apply(undefined, arguments as any), + return function (this: { self: RedisClientMultiCommand }, ...args: Array) { + return this.self._multi.addCommand( + command.transformArguments(...args), transformReply ); }; } - static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { + private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); - return function (this: { self: RedisClientMultiCommand }) { - const fnArgs = fn.transformArguments.apply(undefined, arguments as any), - args: CommandArguments = prefix.concat(fnArgs); - args.preserve = fnArgs.preserve; - return this.self.addCommand( - args, + return function (this: { self: RedisClientMultiCommand }, ...args: Array) { + const fnArgs = fn.transformArguments(...args), + redisArgs: CommandArguments = prefix.concat(fnArgs); + redisArgs.preserve = fnArgs.preserve; + return this.self._multi.addCommand( + redisArgs, transformReply ); }; } - static #createScriptCommand(script: RedisScript, resp: RespVersions) { + private static _createScriptCommand(script: RedisScript, resp: RespVersions) { const transformReply = getTransformReply(script, resp); - return function (this: RedisClientMultiCommand) { - return this.addScript( + return function (this: RedisClientMultiCommand, ...args: Array) { + this._multi.addScript( script, - script.transformArguments.apply(undefined, arguments as any), + script.transformArguments(...args), transformReply ); + return this; }; } @@ -154,35 +140,36 @@ export default class RedisClientMultiCommand extends RedisMultiCom return attachConfig({ BaseClass: RedisClientMultiCommand, commands: COMMANDS, - createCommand: RedisClientMultiCommand.#createCommand, - createModuleCommand: RedisClientMultiCommand.#createModuleCommand, - createFunctionCommand: RedisClientMultiCommand.#createFunctionCommand, - createScriptCommand: RedisClientMultiCommand.#createScriptCommand, + createCommand: RedisClientMultiCommand._createCommand, + createModuleCommand: RedisClientMultiCommand._createModuleCommand, + createFunctionCommand: RedisClientMultiCommand._createFunctionCommand, + createScriptCommand: RedisClientMultiCommand._createScriptCommand, config }); } - readonly #client: RedisClientType; - #selectedDB?: number; + private readonly _multi = new RedisMultiCommand(); + private readonly _client: RedisClientType; + private _selectedDB?: number; constructor(client: RedisClientType) { - super(); - this.#client = client; + this._client = client; } SELECT(db: number, transformReply?: TransformReply): this { - this.#selectedDB = db; - return this.addCommand(['SELECT', db.toString()], transformReply); + this._selectedDB = db; + this._multi.addCommand(['SELECT', db.toString()], transformReply); + return this; } select = this.SELECT; - async exec(execAsPipeline = false): Promise> { + async exec(execAsPipeline = false): Promise> { if (execAsPipeline) return this.execAsPipeline(); - return this.transformReplies( - await this.#client.executeMulti(this.queue, this.#selectedDB) - ) as ReplyType; + return this._multi.transformReplies( + await this._client.executeMulti(this._multi.queue, this._selectedDB) + ) as MultiReplyType; } EXEC = this.exec; @@ -191,12 +178,12 @@ export default class RedisClientMultiCommand extends RedisMultiCom return this.exec(execAsPipeline); } - async execAsPipeline(): Promise> { - if (this.queue.length === 0) return [] as ReplyType; + async execAsPipeline(): Promise> { + if (this._multi.queue.length === 0) return [] as MultiReplyType; - return this.transformReplies( - await this.#client.executePipeline(this.queue) - ) as ReplyType; + return this._multi.transformReplies( + await this._client.executePipeline(this._multi.queue) + ) as MultiReplyType; } execAsPipelineTyped() { diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index abec4cb7f3..8fb5564cd0 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -1,11 +1,11 @@ -import { ClientCommandOptions, RedisClientOptions, RedisClientType } from '../client'; -import { Command, CommandArguments, CommanderConfig, CommandPolicies, CommandSignature, CommandWithPoliciesSignature, Flags, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, TransformReply } from '../RESP/types'; +import { ClientCommandOptions, RedisClientOptions } from '../client'; +import { Command, CommandArguments, CommanderConfig, CommandPolicies, CommandWithPoliciesSignature, Flags, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions } from '../RESP/types'; import COMMANDS from '../commands'; import { EventEmitter } from 'events'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots'; -// import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command'; -// import { RedisMultiQueuedCommand } from '../multi-command'; +import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command'; +import { RedisMultiQueuedCommand } from '../multi-command'; import { PubSubListener } from '../client/pub-sub'; import { ErrorReply } from '../errors'; @@ -85,7 +85,7 @@ export default class RedisCluster< FLAGS extends Flags, POLICIES extends CommandPolicies > extends EventEmitter { - private static _extractFirstKey( + static extractFirstKey( command: C, args: Parameters, redisArgs: Array @@ -101,46 +101,46 @@ export default class RedisCluster< private static _createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); - return async function (this: ProxyCluster) { - const args = command.transformArguments.apply(undefined, arguments as any), - firstKey = RedisCluster._extractFirstKey( + return async function (this: ProxyCluster, ...args: Array) { + const redisArgs = command.transformArguments(...args), + firstKey = RedisCluster.extractFirstKey( command, - arguments as any, - args + args, + redisArgs ), reply = await this.sendCommand( firstKey, command.IS_READ_ONLY, - args, + redisArgs, this.commandOptions, command.POLICIES ); return transformReply ? - transformReply(reply, args.preserve) : + transformReply(reply, redisArgs.preserve) : reply; }; } private static _createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); - return async function (this: NamespaceProxyCluster) { - const args = command.transformArguments.apply(undefined, arguments as any), - firstKey = RedisCluster._extractFirstKey( + return async function (this: NamespaceProxyCluster, ...args: Array) { + const redisArgs = command.transformArguments(...args), + firstKey = RedisCluster.extractFirstKey( command, - arguments as any, - args + args, + redisArgs ), reply = await this.self.sendCommand( firstKey, command.IS_READ_ONLY, - args, + redisArgs, this.self.commandOptions, command.POLICIES ); return transformReply ? - transformReply(reply, args.preserve) : + transformReply(reply, redisArgs.preserve) : reply; }; } @@ -148,18 +148,18 @@ export default class RedisCluster< private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); - return async function (this: NamespaceProxyCluster) { - const fnArgs = fn.transformArguments.apply(undefined, arguments as any), - args = prefix.concat(fnArgs), - firstKey = RedisCluster._extractFirstKey( + return async function (this: NamespaceProxyCluster, ...args: Array) { + const fnArgs = fn.transformArguments(...args), + redisArgs = prefix.concat(fnArgs), + firstKey = RedisCluster.extractFirstKey( fn, - arguments as any, - args + fnArgs, + redisArgs ), reply = await this.self.sendCommand( firstKey, fn.IS_READ_ONLY, - args, + redisArgs, this.self.commandOptions, fn.POLICIES ); @@ -173,18 +173,18 @@ export default class RedisCluster< private static _createScriptCommand(script: RedisScript, resp: RespVersions) { const prefix = scriptArgumentsPrefix(script), transformReply = getTransformReply(script, resp); - return async function (this: ProxyCluster) { - const scriptArgs = script.transformArguments.apply(undefined, arguments as any), - args = prefix.concat(scriptArgs), - firstKey = RedisCluster._extractFirstKey( + return async function (this: ProxyCluster, ...args: Array) { + const scriptArgs = script.transformArguments(...args), + redisArgs = prefix.concat(scriptArgs), + firstKey = RedisCluster.extractFirstKey( script, - arguments as any, - args + scriptArgs, + redisArgs ), reply = await this.sendCommand( firstKey, script.IS_READ_ONLY, - args, + redisArgs, this.commandOptions, script.POLICIES ); @@ -211,7 +211,7 @@ export default class RedisCluster< config }); - // Client.prototype.Multi = RedisClientMultiCommand.extend(config); + Cluster.prototype.Multi = RedisClusterMultiCommand.extend(config); return (options?: Omit>) => { // returning a proxy of the client to prevent the namespaces.self to leak between proxies @@ -280,8 +280,6 @@ export default class RedisCluster< return this._slots.pubSubNode; } - // readonly #Multi: InstantiableRedisClusterMultiCommandType; - get isOpen() { return this._slots.isOpen; } @@ -291,7 +289,6 @@ export default class RedisCluster< this._options = options; this._slots = new RedisClusterSlots(options, this.emit.bind(this)); - // this.#Multi = RedisClusterMultiCommand.extend(options); } duplicate(overrides?: Partial>): RedisClusterType { @@ -400,20 +397,38 @@ export default class RedisCluster< } } - // MULTI(routing?: RedisCommandArgument): RedisClusterMultiCommandType { - // return new this.#Multi( - // (commands: Array, firstKey?: RedisCommandArgument, chainId?: symbol) => { - // return this.#execute( - // firstKey, - // false, - // client => client.multiExecutor(commands, undefined, chainId) - // ); - // }, - // routing - // ); - // } + /** + * @internal + */ + async executePipeline( + firstKey: RedisArgument | undefined, + isReadonly: boolean | undefined, + commands: Array + ) { + const client = await this._slots.getClient(firstKey, isReadonly); + return client.executePipeline(commands); + } - // multi = this.MULTI; + /** + * @internal + */ + async executeMulti( + firstKey: RedisArgument | undefined, + isReadonly: boolean | undefined, + commands: Array + ) { + const client = await this._slots.getClient(firstKey, isReadonly); + return client.executeMulti(commands); + } + + MULTI(routing?: RedisArgument): RedisClusterMultiCommandType<[], M, F, S, RESP, FLAGS> { + return new (this as any).Multi( + this, + routing + ); + } + + multi = this.MULTI; async SUBSCRIBE( channels: string | Array, diff --git a/packages/client/lib/cluster/multi-command.ts b/packages/client/lib/cluster/multi-command.ts index 379af544af..4ec8e0996d 100644 --- a/packages/client/lib/cluster/multi-command.ts +++ b/packages/client/lib/cluster/multi-command.ts @@ -1,141 +1,245 @@ -// import COMMANDS from './commands'; -// import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisFunctions, RedisModules, RedisExtensions, RedisScript, RedisScripts, ExcludeMappedString, RedisFunction } from '../commands'; -// import RedisMultiCommand, { RedisMultiQueuedCommand } from '../multi-command'; -// import { attachCommands, attachExtensions } from '../commander'; -// import RedisCluster from '.'; +import COMMANDS from '../commands'; +import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command'; +import { ReplyWithFlags, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, Flags, ReplyUnion, RedisArgument } from '../RESP/types'; +import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander'; +import RedisCluster, { RedisClusterType } from '.'; -// type RedisClusterMultiCommandSignature< -// C extends RedisCommand, -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts -// > = (...args: Parameters) => RedisClusterMultiCommandType; +type CommandSignature< + REPLIES extends Array, + C extends Command, + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + FLAGS extends Flags +> = (...args: Parameters) => RedisClusterMultiCommandType< + [...REPLIES, ReplyWithFlags, FLAGS>], + M, + F, + S, + RESP, + FLAGS +>; -// type WithCommands< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts -// > = { -// [P in keyof typeof COMMANDS]: RedisClusterMultiCommandSignature<(typeof COMMANDS)[P], M, F, S>; -// }; +type WithCommands< + REPLIES extends Array, + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + FLAGS extends Flags +> = { + [P in keyof typeof COMMANDS]: CommandSignature; +}; -// type WithModules< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts -// > = { -// [P in keyof M as ExcludeMappedString

]: { -// [C in keyof M[P] as ExcludeMappedString]: RedisClusterMultiCommandSignature; -// }; -// }; +type WithModules< + REPLIES extends Array, + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + FLAGS extends Flags +> = { + [P in keyof M]: { + [C in keyof M[P]]: CommandSignature; + }; +}; -// type WithFunctions< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts -// > = { -// [P in keyof F as ExcludeMappedString

]: { -// [FF in keyof F[P] as ExcludeMappedString]: RedisClusterMultiCommandSignature; -// }; -// }; +type WithFunctions< + REPLIES extends Array, + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + FLAGS extends Flags +> = { + [L in keyof F]: { + [C in keyof F[L]]: CommandSignature; + }; +}; -// type WithScripts< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts -// > = { -// [P in keyof S as ExcludeMappedString

]: RedisClusterMultiCommandSignature; -// }; +type WithScripts< + REPLIES extends Array, + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + FLAGS extends Flags +> = { + [P in keyof S]: CommandSignature; +}; -// export type RedisClusterMultiCommandType< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts -// > = RedisClusterMultiCommand & WithCommands & WithModules & WithFunctions & WithScripts; +export type RedisClusterMultiCommandType< + REPLIES extends Array, + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + FLAGS extends Flags +> = ( + RedisClusterMultiCommand & + WithCommands & + WithModules & + WithFunctions & + WithScripts +); -// export type InstantiableRedisClusterMultiCommandType< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts -// > = new (...args: ConstructorParameters) => RedisClusterMultiCommandType; +export default class RedisClusterMultiCommand { + private static _createCommand(command: Command, resp: RespVersions) { + const transformReply = getTransformReply(command, resp); + return function (this: RedisClusterMultiCommand, ...args: Array) { + const redisArgs = command.transformArguments(...args), + firstKey = RedisCluster.extractFirstKey( + command, + args, + redisArgs + ); + return this.addCommand( + firstKey, + command.IS_READ_ONLY, + redisArgs, + transformReply + ); + }; + } -// export type RedisClusterMultiExecutor = (queue: Array, firstKey?: RedisCommandArgument, chainId?: symbol) => Promise>; + private static _createModuleCommand(command: Command, resp: RespVersions) { + const transformReply = getTransformReply(command, resp); + return function (this: { self: RedisClusterMultiCommand }, ...args: Array) { + const redisArgs = command.transformArguments(...args), + firstKey = RedisCluster.extractFirstKey( + command, + args, + redisArgs + ); + return this.self.addCommand( + firstKey, + command.IS_READ_ONLY, + redisArgs, + transformReply + ); + }; + } -// export default class RedisClusterMultiCommand { -// readonly #multi = new RedisMultiCommand(); -// readonly #executor: RedisClusterMultiExecutor; -// #firstKey: RedisCommandArgument | undefined; + private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { + const prefix = functionArgumentsPrefix(name, fn), + transformReply = getTransformReply(fn, resp); + return function (this: { self: RedisClusterMultiCommand }, ...args: Array) { + const fnArgs = fn.transformArguments(...args), + redisArgs: CommandArguments = prefix.concat(fnArgs), + firstKey = RedisCluster.extractFirstKey( + fn, + args, + fnArgs + ); + redisArgs.preserve = fnArgs.preserve; + return this.self.addCommand( + firstKey, + fn.IS_READ_ONLY, + redisArgs, + transformReply + ); + }; + } -// static extend< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts -// >(extensions?: RedisExtensions): InstantiableRedisClusterMultiCommandType { -// return attachExtensions({ -// BaseClass: RedisClusterMultiCommand, -// modulesExecutor: RedisClusterMultiCommand.prototype.commandsExecutor, -// modules: extensions?.modules, -// functionsExecutor: RedisClusterMultiCommand.prototype.functionsExecutor, -// functions: extensions?.functions, -// scriptsExecutor: RedisClusterMultiCommand.prototype.scriptsExecutor, -// scripts: extensions?.scripts -// }); -// } + private static _createScriptCommand(script: RedisScript, resp: RespVersions) { + const transformReply = getTransformReply(script, resp); + return function (this: RedisClusterMultiCommand, ...args: Array) { + const scriptArgs = script.transformArguments(...args); + this._setState( + RedisCluster.extractFirstKey( + script, + args, + scriptArgs + ), + script.IS_READ_ONLY + ); + this._multi.addScript( + script, + scriptArgs, + transformReply + ); + return this; + }; + } -// constructor(executor: RedisClusterMultiExecutor, firstKey?: RedisCommandArgument) { -// this.#executor = executor; -// this.#firstKey = firstKey; -// } + static extend< + M extends RedisModules = Record, + F extends RedisFunctions = Record, + S extends RedisScripts = Record, + RESP extends RespVersions = 2 + >(config?: CommanderConfig) { + return attachConfig({ + BaseClass: RedisClusterMultiCommand, + commands: COMMANDS, + createCommand: RedisClusterMultiCommand._createCommand, + createModuleCommand: RedisClusterMultiCommand._createModuleCommand, + createFunctionCommand: RedisClusterMultiCommand._createFunctionCommand, + createScriptCommand: RedisClusterMultiCommand._createScriptCommand, + config + }); + } -// commandsExecutor(command: RedisCommand, args: Array): this { -// const transformedArguments = command.transformArguments(...args); -// this.#firstKey ??= RedisCluster.extractFirstKey(command, args, transformedArguments); -// return this.addCommand(undefined, transformedArguments, command.transformReply); -// } + private readonly _multi = new RedisMultiCommand(); + private readonly _cluster: RedisClusterType; + private _firstKey: RedisArgument | undefined; + private _isReadonly: boolean | undefined = true; -// addCommand( -// firstKey: RedisCommandArgument | undefined, -// args: RedisCommandArguments, -// transformReply?: RedisCommand['transformReply'] -// ): this { -// this.#firstKey ??= firstKey; -// this.#multi.addCommand(args, transformReply); -// return this; -// } + constructor(cluster: RedisClusterType, routing: RedisArgument | undefined) { + this._cluster = cluster; + this._firstKey = routing; + } -// functionsExecutor(fn: RedisFunction, args: Array, name: string): this { -// const transformedArguments = this.#multi.addFunction(name, fn, args); -// this.#firstKey ??= RedisCluster.extractFirstKey(fn, args, transformedArguments); -// return this; -// } + private _setState( + firstKey: RedisArgument | undefined, + isReadonly: boolean | undefined, + ) { + this._firstKey ??= firstKey; + this._isReadonly &&= isReadonly; + } -// scriptsExecutor(script: RedisScript, args: Array): this { -// const transformedArguments = this.#multi.addScript(script, args); -// this.#firstKey ??= RedisCluster.extractFirstKey(script, args, transformedArguments); -// return this; -// } + addCommand( + firstKey: RedisArgument | undefined, + isReadonly: boolean | undefined, + args: CommandArguments, + transformReply?: TransformReply + ) { + this._setState(firstKey, isReadonly); + this._multi.addCommand(args, transformReply); + return this; + } -// async exec(execAsPipeline = false): Promise> { -// if (execAsPipeline) { -// return this.execAsPipeline(); -// } + async exec(execAsPipeline = false) { + if (execAsPipeline) return this.execAsPipeline(); -// return this.#multi.handleExecReplies( -// await this.#executor(this.#multi.queue, this.#firstKey, RedisMultiCommand.generateChainId()) -// ); -// } + return this._multi.transformReplies( + await this._cluster.executeMulti( + this._firstKey, + this._isReadonly, + this._multi.queue + ) + ) as MultiReplyType; + } -// EXEC = this.exec; + EXEC = this.exec; -// async execAsPipeline(): Promise> { -// return this.#multi.transformReplies( -// await this.#executor(this.#multi.queue, this.#firstKey) -// ); -// } -// } + execTyped(execAsPipeline = false) { + return this.exec(execAsPipeline); + } -// attachCommands({ -// BaseClass: RedisClusterMultiCommand, -// commands: COMMANDS, -// executor: RedisClusterMultiCommand.prototype.commandsExecutor -// }); + async execAsPipeline() { + if (this._multi.queue.length === 0) return [] as MultiReplyType; + + return this._multi.transformReplies( + await this._cluster.executePipeline( + this._firstKey, + this._isReadonly, + this._multi.queue + ) + ) as MultiReplyType; + } + + execAsPipelineTyped() { + return this.execAsPipeline(); + } +} diff --git a/packages/client/lib/multi-command.ts b/packages/client/lib/multi-command.ts index 65570ff33a..19bb680d68 100644 --- a/packages/client/lib/multi-command.ts +++ b/packages/client/lib/multi-command.ts @@ -1,5 +1,15 @@ import { CommandArguments, RedisScript, TransformReply } from './RESP/types'; +// TODO: enum? +export type MULTI_REPLY = { + GENERIC: 'generic'; + TYPED: 'typed'; +}; + +export type MultiReply = MULTI_REPLY[keyof MULTI_REPLY]; + +export type MultiReplyType = T extends MULTI_REPLY['TYPED'] ? REPLIES : Array; + export interface RedisMultiQueuedCommand { args: CommandArguments; transformReply?: TransformReply; @@ -15,7 +25,6 @@ export default class RedisMultiCommand { args, transformReply }); - return this; } addScript(script: RedisScript, args: CommandArguments, transformReply?: TransformReply) { @@ -34,7 +43,7 @@ export default class RedisMultiCommand { redisArgs.push(...args); redisArgs.preserve = args.preserve; - return this.addCommand(redisArgs, transformReply); + this.addCommand(redisArgs, transformReply); } transformReplies(rawReplies: Array): Array {