import COMMANDS from './commands'; import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands'; import { ClientCommandOptions, RedisClientCommandSignature, RedisClientOptions, RedisClientType, WithModules, WithScripts } from '../client'; import RedisClusterSlots, { ClusterNode, NodeAddressMap } from './cluster-slots'; import { extendWithModulesAndScripts, transformCommandArguments, transformCommandReply, extendWithCommands } from '../commander'; import { EventEmitter } from 'events'; import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command'; import { RedisMultiQueuedCommand } from '../multi-command'; export type RedisClusterClientOptions = Omit; export interface RedisClusterOptions< M extends RedisModules = Record, S extends RedisScripts = Record > extends RedisPlugins { rootNodes: Array; defaults?: Partial; useReplicas?: boolean; maxCommandRedirections?: number; nodeAddressMap?: NodeAddressMap; } type WithCommands = { [P in keyof typeof COMMANDS]: RedisClientCommandSignature<(typeof COMMANDS)[P]>; }; export type RedisClusterType< M extends RedisModules = Record, S extends RedisScripts = Record > = RedisCluster & WithCommands & WithModules & WithScripts; export default class RedisCluster extends EventEmitter { static extractFirstKey(command: RedisCommand, originalArgs: Array, redisArgs: RedisCommandArguments): RedisCommandArgument | undefined { if (command.FIRST_KEY_INDEX === undefined) { return undefined; } else if (typeof command.FIRST_KEY_INDEX === 'number') { return redisArgs[command.FIRST_KEY_INDEX]; } return command.FIRST_KEY_INDEX(...originalArgs); } static create(options?: RedisClusterOptions): RedisClusterType { return new (extendWithModulesAndScripts({ BaseClass: RedisCluster, modules: options?.modules, modulesCommandsExecutor: RedisCluster.prototype.commandsExecutor, scripts: options?.scripts, scriptsExecutor: RedisCluster.prototype.scriptsExecutor }))(options); } readonly #options: RedisClusterOptions; readonly #slots: RedisClusterSlots; readonly #Multi: new (...args: ConstructorParameters) => RedisClusterMultiCommandType; constructor(options: RedisClusterOptions) { super(); this.#options = options; this.#slots = new RedisClusterSlots(options, err => this.emit('error', err)); this.#Multi = RedisClusterMultiCommand.extend(options); } duplicate(overrides?: Partial>): RedisClusterType { return new (Object.getPrototypeOf(this).constructor)({ ...this.#options, ...overrides }); } async connect(): Promise { return this.#slots.connect(); } async commandsExecutor(command: RedisCommand, args: Array): Promise> { const { args: redisArgs, options } = transformCommandArguments(command, args); return transformCommandReply( command, await this.sendCommand( RedisCluster.extractFirstKey(command, args, redisArgs), command.IS_READ_ONLY, redisArgs, options ), redisArgs.preserve ); } async sendCommand( firstKey: RedisCommandArgument | undefined, isReadonly: boolean | undefined, args: RedisCommandArguments, options?: ClientCommandOptions ): Promise { return this.#execute( firstKey, isReadonly, client => client.sendCommand(args, options) ); } async scriptsExecutor(script: RedisScript, args: Array): Promise> { const { args: redisArgs, options } = transformCommandArguments(script, args); return transformCommandReply( script, await this.executeScript( script, args, redisArgs, options ), redisArgs.preserve ); } async executeScript( script: RedisScript, originalArgs: Array, redisArgs: RedisCommandArguments, options?: ClientCommandOptions ): Promise> { return this.#execute( RedisCluster.extractFirstKey(script, originalArgs, redisArgs), script.IS_READ_ONLY, client => client.executeScript(script, redisArgs, options) ); } async #execute( firstKey: RedisCommandArgument | undefined, isReadonly: boolean | undefined, executor: (client: RedisClientType) => Promise ): Promise { const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; let client = this.#slots.getClient(firstKey, isReadonly); for (let i = 0;; i++) { try { return await executor(client); } catch (err) { if (++i > maxCommandRedirections || !(err instanceof Error)) { throw err; } if (err.message.startsWith('ASK')) { const address = err.message.substring(err.message.lastIndexOf(' ') + 1); if (this.#slots.getNodeByAddress(address)?.client === client) { await client.asking(); continue; } await this.#slots.rediscover(client); const redirectTo = this.#slots.getNodeByAddress(address); if (!redirectTo) { throw new Error(`Cannot find node ${address}`); } await redirectTo.client.asking(); client = redirectTo.client; continue; } else if (err.message.startsWith('MOVED')) { await this.#slots.rediscover(client); client = this.#slots.getClient(firstKey, isReadonly); continue; } throw err; } } } multi(routing?: RedisCommandArgument): RedisClusterMultiCommandType { return new this.#Multi( (commands: Array, firstKey?: RedisCommandArgument, chainId?: symbol) => { return this.#execute( firstKey, false, client => client.multiExecutor(commands, chainId) ); }, routing ); } getMasters(): Array> { return this.#slots.getMasters(); } getSlotMaster(slot: number): ClusterNode { return this.#slots.getSlotMaster(slot); } quit(): Promise { return this.#slots.quit(); } disconnect(): Promise { return this.#slots.disconnect(); } } extendWithCommands({ BaseClass: RedisCluster, commands: COMMANDS, executor: RedisCluster.prototype.commandsExecutor });