import COMMANDS from './commands'; import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisFunctions, RedisModules, RedisExtensions, RedisScript, RedisScripts, RedisCommandSignature, RedisFunction } from '../commands'; import { ClientCommandOptions, RedisClientOptions, RedisClientType, WithFunctions, WithModules, WithScripts } from '../client'; import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots'; import { attachExtensions, transformCommandReply, attachCommands, transformCommandArguments } from '../commander'; import { EventEmitter } from 'events'; import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command'; import { RedisMultiQueuedCommand } from '../multi-command'; import { PubSubListener } from '../client/pub-sub'; import { ErrorReply } from '../errors'; export type RedisClusterClientOptions = Omit< RedisClientOptions, 'modules' | 'functions' | 'scripts' | 'database' >; export interface RedisClusterOptions< M extends RedisModules = Record, F extends RedisFunctions = Record, S extends RedisScripts = Record > extends RedisExtensions { /** * Should contain details for some of the cluster nodes that the client will use to discover * the "cluster topology". We recommend including details for at least 3 nodes here. */ rootNodes: Array; /** * Default values used for every client in the cluster. Use this to specify global values, * for example: ACL credentials, timeouts, TLS configuration etc. */ defaults?: Partial; /** * When `true`, `.connect()` will only discover the cluster topology, without actually connecting to all the nodes. * Useful for short-term or PubSub-only connections. */ minimizeConnections?: boolean; /** * When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes. */ useReplicas?: boolean; /** * The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors. */ maxCommandRedirections?: number; /** * Mapping between the addresses in the cluster (see `CLUSTER SHARDS`) and the addresses the client should connect to * Useful when the cluster is running on another network * */ nodeAddressMap?: NodeAddressMap; } type WithCommands = { [P in keyof typeof COMMANDS]: RedisCommandSignature<(typeof COMMANDS)[P]>; }; export type RedisClusterType< M extends RedisModules = Record, F extends RedisFunctions = Record, S extends RedisScripts = Record > = RedisCluster & WithCommands & WithModules & WithFunctions & WithScripts; export default class RedisCluster< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > extends EventEmitter { static extractFirstKey( command: RedisCommand, originalArgs: Array, redisArgs: RedisCommandArguments ): RedisCommandArgument | undefined { if (command.FIRST_KEY_INDEX === undefined) { return undefined; } else if (typeof command.FIRST_KEY_INDEX === 'number') { return redisArgs[command.FIRST_KEY_INDEX]; } return command.FIRST_KEY_INDEX(...originalArgs); } static create< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts >(options?: RedisClusterOptions): RedisClusterType { return new (attachExtensions({ BaseClass: RedisCluster, modulesExecutor: RedisCluster.prototype.commandsExecutor, modules: options?.modules, functionsExecutor: RedisCluster.prototype.functionsExecutor, functions: options?.functions, scriptsExecutor: RedisCluster.prototype.scriptsExecutor, scripts: options?.scripts }))(options); } readonly #options: RedisClusterOptions; readonly #slots: RedisClusterSlots; get slots() { return this.#slots.slots; } get shards() { return this.#slots.shards; } get masters() { return this.#slots.masters; } get replicas() { return this.#slots.replicas; } get nodeByAddress() { return this.#slots.nodeByAddress; } get pubSubNode() { return this.#slots.pubSubNode; } readonly #Multi: InstantiableRedisClusterMultiCommandType; get isOpen() { return this.#slots.isOpen; } constructor(options: RedisClusterOptions) { super(); this.#options = options; this.#slots = new RedisClusterSlots(options, this.emit.bind(this)); this.#Multi = RedisClusterMultiCommand.extend(options); } duplicate(overrides?: Partial>): RedisClusterType { return new (Object.getPrototypeOf(this).constructor)({ ...this.#options, ...overrides }); } connect() { return this.#slots.connect(); } async commandsExecutor( command: C, args: Array ): Promise> { const { args: redisArgs, options } = transformCommandArguments(command, args); return transformCommandReply( command, await this.sendCommand( RedisCluster.extractFirstKey(command, args, redisArgs), command.IS_READ_ONLY, redisArgs, options ), redisArgs.preserve ); } async sendCommand( firstKey: RedisCommandArgument | undefined, isReadonly: boolean | undefined, args: RedisCommandArguments, options?: ClientCommandOptions ): Promise { return this.#execute( firstKey, isReadonly, client => client.sendCommand(args, options) ); } async functionsExecutor( fn: F, args: Array, name: string, ): Promise> { const { args: redisArgs, options } = transformCommandArguments(fn, args); return transformCommandReply( fn, await this.executeFunction( name, fn, args, redisArgs, options ), redisArgs.preserve ); } async executeFunction( name: string, fn: RedisFunction, originalArgs: Array, redisArgs: RedisCommandArguments, options?: ClientCommandOptions ): Promise { return this.#execute( RedisCluster.extractFirstKey(fn, originalArgs, redisArgs), fn.IS_READ_ONLY, client => client.executeFunction(name, fn, redisArgs, options) ); } async scriptsExecutor(script: S, args: Array): Promise> { const { args: redisArgs, options } = transformCommandArguments(script, args); return transformCommandReply( script, await this.executeScript( script, args, redisArgs, options ), redisArgs.preserve ); } async executeScript( script: RedisScript, originalArgs: Array, redisArgs: RedisCommandArguments, options?: ClientCommandOptions ): Promise { return this.#execute( RedisCluster.extractFirstKey(script, originalArgs, redisArgs), script.IS_READ_ONLY, client => client.executeScript(script, redisArgs, options) ); } async #execute( firstKey: RedisCommandArgument | undefined, isReadonly: boolean | undefined, executor: (client: RedisClientType) => Promise ): Promise { const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; let client = await this.#slots.getClient(firstKey, isReadonly); for (let i = 0;; i++) { try { return await executor(client); } catch (err) { if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) { throw err; } if (err.message.startsWith('ASK')) { const address = err.message.substring(err.message.lastIndexOf(' ') + 1); let redirectTo = await this.#slots.getMasterByAddress(address); if (!redirectTo) { await this.#slots.rediscover(client); redirectTo = await this.#slots.getMasterByAddress(address); } if (!redirectTo) { throw new Error(`Cannot find node ${address}`); } await redirectTo.asking(); client = redirectTo; continue; } else if (err.message.startsWith('MOVED')) { await this.#slots.rediscover(client); client = await this.#slots.getClient(firstKey, isReadonly); continue; } throw err; } } } MULTI(routing?: RedisCommandArgument): RedisClusterMultiCommandType { return new this.#Multi( (commands: Array, firstKey?: RedisCommandArgument, chainId?: symbol) => { return this.#execute( firstKey, false, client => client.multiExecutor(commands, undefined, chainId) ); }, routing ); } multi = this.MULTI; async SUBSCRIBE( channels: string | Array, listener: PubSubListener, bufferMode?: T ) { return (await this.#slots.getPubSubClient()) .SUBSCRIBE(channels, listener, bufferMode); } subscribe = this.SUBSCRIBE; async UNSUBSCRIBE( channels?: string | Array, listener?: PubSubListener, bufferMode?: T ) { return this.#slots.executeUnsubscribeCommand(client => client.UNSUBSCRIBE(channels, listener, bufferMode) ); } unsubscribe = this.UNSUBSCRIBE; async PSUBSCRIBE( patterns: string | Array, listener: PubSubListener, bufferMode?: T ) { return (await this.#slots.getPubSubClient()) .PSUBSCRIBE(patterns, listener, bufferMode); } pSubscribe = this.PSUBSCRIBE; async PUNSUBSCRIBE( patterns?: string | Array, listener?: PubSubListener, bufferMode?: T ) { return this.#slots.executeUnsubscribeCommand(client => client.PUNSUBSCRIBE(patterns, listener, bufferMode) ); } pUnsubscribe = this.PUNSUBSCRIBE; async SSUBSCRIBE( channels: string | Array, listener: PubSubListener, bufferMode?: T ) { const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16, firstChannel = Array.isArray(channels) ? channels[0] : channels; let client = await this.#slots.getShardedPubSubClient(firstChannel); for (let i = 0;; i++) { try { return await client.SSUBSCRIBE(channels, listener, bufferMode); } catch (err) { if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) { throw err; } if (err.message.startsWith('MOVED')) { await this.#slots.rediscover(client); client = await this.#slots.getShardedPubSubClient(firstChannel); continue; } throw err; } } } sSubscribe = this.SSUBSCRIBE; SUNSUBSCRIBE( channels: string | Array, listener: PubSubListener, bufferMode?: T ) { return this.#slots.executeShardedUnsubscribeCommand( Array.isArray(channels) ? channels[0] : channels, client => client.SUNSUBSCRIBE(channels, listener, bufferMode) ); } sUnsubscribe = this.SUNSUBSCRIBE; quit(): Promise { return this.#slots.quit(); } disconnect(): Promise { return this.#slots.disconnect(); } nodeClient(node: ShardNode) { return this.#slots.nodeClient(node); } getRandomNode() { return this.#slots.getRandomNode(); } getSlotRandomNode(slot: number) { return this.#slots.getSlotRandomNode(slot); } /** * @deprecated use `.masters` instead */ getMasters() { return this.masters; } /** * @deprecated use `.slots[]` instead */ getSlotMaster(slot: number) { return this.slots[slot].master; } } attachCommands({ BaseClass: RedisCluster, commands: COMMANDS, executor: RedisCluster.prototype.commandsExecutor });