import COMMANDS from './commands'; import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisFunctions, RedisModules, RedisExtensions, RedisScript, RedisScripts, RedisCommandSignature, RedisFunction } from '../commands'; import { ClientCommandOptions, RedisClientOptions, RedisClientType, WithFunctions, WithModules, WithScripts } from '../client'; import RedisClusterSlots, { ClusterNode, NodeAddressMap } from './cluster-slots'; import { attachExtensions, transformCommandReply, attachCommands, transformCommandArguments } from '../commander'; import { EventEmitter } from 'events'; import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command'; import { RedisMultiQueuedCommand } from '../multi-command'; export type RedisClusterClientOptions = Omit< RedisClientOptions, 'modules' | 'functions' | 'scripts' | 'database' >; export interface RedisClusterOptions< M extends RedisModules = Record, F extends RedisFunctions = Record, S extends RedisScripts = Record > extends RedisExtensions { rootNodes: Array; defaults?: Partial; useReplicas?: boolean; maxCommandRedirections?: number; nodeAddressMap?: NodeAddressMap; } type WithCommands = { [P in keyof typeof COMMANDS]: RedisCommandSignature<(typeof COMMANDS)[P]>; }; export type RedisClusterType< M extends RedisModules = Record, F extends RedisFunctions = Record, S extends RedisScripts = Record > = RedisCluster & WithCommands & WithModules & WithFunctions & WithScripts; export default class RedisCluster< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > extends EventEmitter { static extractFirstKey( command: RedisCommand, originalArgs: Array, redisArgs: RedisCommandArguments ): RedisCommandArgument | undefined { if (command.FIRST_KEY_INDEX === undefined) { return undefined; } else if (typeof command.FIRST_KEY_INDEX === 'number') { return redisArgs[command.FIRST_KEY_INDEX]; } return command.FIRST_KEY_INDEX(...originalArgs); } static create< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts >(options?: RedisClusterOptions): RedisClusterType { return new (attachExtensions({ BaseClass: RedisCluster, modulesExecutor: RedisCluster.prototype.commandsExecutor, modules: options?.modules, functionsExecutor: RedisCluster.prototype.functionsExecutor, functions: options?.functions, scriptsExecutor: RedisCluster.prototype.scriptsExecutor, scripts: options?.scripts }))(options); } readonly #options: RedisClusterOptions; readonly #slots: RedisClusterSlots; readonly #Multi: InstantiableRedisClusterMultiCommandType; constructor(options: RedisClusterOptions) { super(); this.#options = options; this.#slots = new RedisClusterSlots(options, err => this.emit('error', err)); this.#Multi = RedisClusterMultiCommand.extend(options); } duplicate(overrides?: Partial>): RedisClusterType { return new (Object.getPrototypeOf(this).constructor)({ ...this.#options, ...overrides }); } async connect(): Promise { return this.#slots.connect(); } async commandsExecutor( command: C, args: Array ): Promise> { const { args: redisArgs, options } = transformCommandArguments(command, args); return transformCommandReply( command, await this.sendCommand( RedisCluster.extractFirstKey(command, args, redisArgs), command.IS_READ_ONLY, redisArgs, options ), redisArgs.preserve ); } async sendCommand( firstKey: RedisCommandArgument | undefined, isReadonly: boolean | undefined, args: RedisCommandArguments, options?: ClientCommandOptions ): Promise { return this.#execute( firstKey, isReadonly, client => client.sendCommand(args, options) ); } async functionsExecutor( fn: F, args: Array, name: string, ): Promise> { const { args: redisArgs, options } = transformCommandArguments(fn, args); return transformCommandReply( fn, await this.executeFunction( name, fn, args, redisArgs, options ), redisArgs.preserve ); } async executeFunction( name: string, fn: RedisFunction, originalArgs: Array, redisArgs: RedisCommandArguments, options?: ClientCommandOptions ): Promise { return this.#execute( RedisCluster.extractFirstKey(fn, originalArgs, redisArgs), fn.IS_READ_ONLY, client => client.executeFunction(name, fn, redisArgs, options) ); } async scriptsExecutor(script: S, args: Array): Promise> { const { args: redisArgs, options } = transformCommandArguments(script, args); return transformCommandReply( script, await this.executeScript( script, args, redisArgs, options ), redisArgs.preserve ); } async executeScript( script: RedisScript, originalArgs: Array, redisArgs: RedisCommandArguments, options?: ClientCommandOptions ): Promise { return this.#execute( RedisCluster.extractFirstKey(script, originalArgs, redisArgs), script.IS_READ_ONLY, client => client.executeScript(script, redisArgs, options) ); } async #execute( firstKey: RedisCommandArgument | undefined, isReadonly: boolean | undefined, executor: (client: RedisClientType) => Promise ): Promise { const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; let client = this.#slots.getClient(firstKey, isReadonly); for (let i = 0;; i++) { try { return await executor(client); } catch (err) { if (++i > maxCommandRedirections || !(err instanceof Error)) { throw err; } if (err.message.startsWith('ASK')) { const address = err.message.substring(err.message.lastIndexOf(' ') + 1); if (this.#slots.getNodeByAddress(address)?.client === client) { await client.asking(); continue; } await this.#slots.rediscover(client); const redirectTo = this.#slots.getNodeByAddress(address); if (!redirectTo) { throw new Error(`Cannot find node ${address}`); } await redirectTo.client.asking(); client = redirectTo.client; continue; } else if (err.message.startsWith('MOVED')) { await this.#slots.rediscover(client); client = this.#slots.getClient(firstKey, isReadonly); continue; } throw err; } } } MULTI(routing?: RedisCommandArgument): RedisClusterMultiCommandType { return new this.#Multi( (commands: Array, firstKey?: RedisCommandArgument, chainId?: symbol) => { return this.#execute( firstKey, false, client => client.multiExecutor(commands, undefined, chainId) ); }, routing ); } multi = this.MULTI; getMasters(): Array> { return this.#slots.getMasters(); } getSlotMaster(slot: number): ClusterNode { return this.#slots.getSlotMaster(slot); } quit(): Promise { return this.#slots.quit(); } disconnect(): Promise { return this.#slots.disconnect(); } } attachCommands({ BaseClass: RedisCluster, commands: COMMANDS, executor: RedisCluster.prototype.commandsExecutor });