import COMMANDS, { TransformArgumentsReply } from './commands'; import { RedisCommand, RedisModules, RedisReply } from './commands'; import { RedisLuaScript, RedisLuaScripts } from './lua-script'; import { RedisClientOptions } from './client'; import { extendWithModulesAndScripts, extendWithDefaultCommands } from './commander'; import { WatchError } from './errors'; type RedisMultiCommandSignature = (...args: Parameters) => RedisMultiCommandType; type WithCommands = { [P in keyof typeof COMMANDS]: RedisMultiCommandSignature<(typeof COMMANDS)[P], M, S> }; type WithModules = { [P in keyof M]: { [C in keyof M[P]]: RedisMultiCommandSignature; }; }; type WithScripts = { [P in keyof S]: RedisMultiCommandSignature }; export type RedisMultiCommandType = RedisMultiCommand & WithCommands & WithModules & WithScripts; export interface MultiQueuedCommand { args: TransformArgumentsReply; preservedArguments?: unknown; transformReply?: RedisCommand['transformReply']; } export type RedisMultiExecutor = (queue: Array, chainId?: symbol) => Promise>; export default class RedisMultiCommand { static extend( clientOptions?: RedisClientOptions ): new (...args: ConstructorParameters) => RedisMultiCommandType { return extendWithModulesAndScripts({ BaseClass: RedisMultiCommand, modules: clientOptions?.modules, modulesCommandsExecutor: RedisMultiCommand.prototype.commandsExecutor, scripts: clientOptions?.scripts, scriptsExecutor: RedisMultiCommand.prototype.scriptsExecutor }); } static create( executor: RedisMultiExecutor, clientOptions?: RedisClientOptions ): RedisMultiCommandType { return new this(executor, clientOptions); } readonly #executor: RedisMultiExecutor; readonly #clientOptions: RedisClientOptions | undefined; readonly #queue: Array = []; readonly #scriptsInUse = new Set(); readonly #v4: Record = {}; get v4(): Record { if (!this.#clientOptions?.legacyMode) { throw new Error('client is not in "legacy mode"'); } return this.#v4; } constructor(executor: RedisMultiExecutor, clientOptions?: RedisClientOptions) { this.#executor = executor; this.#clientOptions = clientOptions; this.#legacyMode(); } #legacyMode(): void { if (!this.#clientOptions?.legacyMode) return; this.#v4.addCommand = this.addCommand.bind(this); (this as any).addCommand = (...args: Array): this => { this.#queue.push({ args: args.flat() as Array }); return this; } this.#v4.exec = this.exec.bind(this); (this as any).exec = (callback?: (err: Error | null, replies?: Array) => unknown): void => { this.#v4.exec() .then((reply: Array) => { if (!callback) return; callback(null, reply); }) .catch((err: Error) => { if (!callback) { // this.emit('error', err); return; } callback(err); }); }; for (const name of Object.keys(COMMANDS)) { this.#defineLegacyCommand(name); } } #defineLegacyCommand(name: string): void { (this as any).#v4[name] = (this as any)[name].bind(this.#v4); (this as any)[name] = (...args: Array): void => (this as any).addCommand(name, args); } commandsExecutor(command: RedisCommand, args: Array): this { return this.addCommand( command.transformArguments(...args), command.transformReply ); } scriptsExecutor(script: RedisLuaScript, args: Array): this { const transformedArguments: TransformArgumentsReply = []; if (this.#scriptsInUse.has(script.SHA1)) { transformedArguments.push( 'EVALSHA', script.SHA1 ); } else { this.#scriptsInUse.add(script.SHA1); transformedArguments.push( 'EVAL', script.SCRIPT ); } transformedArguments.push(script.NUMBER_OF_KEYS.toString()); const scriptArguments = script.transformArguments(...args); transformedArguments.push(...scriptArguments); if (scriptArguments.preserve) { transformedArguments.preserve = scriptArguments.preserve; } return this.addCommand( transformedArguments, script.transformReply ); } addCommand(args: TransformArgumentsReply, transformReply?: RedisCommand['transformReply']): this { this.#queue.push({ args, preservedArguments: args.preserve, transformReply }); return this; } async exec(execAsPipeline = false): Promise> { if (execAsPipeline) { return this.execAsPipeline(); } else if (!this.#queue.length) { return []; } const queue = this.#queue.splice(0), rawReplies = await this.#executor([ { args: ['MULTI'] }, ...queue, { args: ['EXEC'] } ], Symbol('[RedisMultiCommand] Chain ID')), execReply = rawReplies[rawReplies.length - 1] as (null | Array); if (execReply === null) { throw new WatchError(); } return this.#transformReplies(execReply, queue); } async execAsPipeline(): Promise> { if (!this.#queue.length) { return []; } const queue = this.#queue.splice(0); return this.#transformReplies( await this.#executor(queue), queue ); } #transformReplies(rawReplies: Array, queue: Array): Array { return rawReplies.map((reply, i) => { const { transformReply, preservedArguments } = queue[i]; return transformReply ? transformReply(reply, preservedArguments) : reply; }); } } extendWithDefaultCommands(RedisMultiCommand, RedisMultiCommand.prototype.commandsExecutor);