import { ClientCommandOptions, RedisClientOptions, RedisClientType } from '../client'; import { Command, CommandArguments, CommanderConfig, CommandPolicies, CommandSignature, CommandWithPoliciesSignature, Flags, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, TransformReply } from '../RESP/types'; import COMMANDS from '../commands'; import { EventEmitter } from 'events'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots'; // import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command'; // import { RedisMultiQueuedCommand } from '../multi-command'; import { PubSubListener } from '../client/pub-sub'; import { ErrorReply } from '../errors'; export type RedisClusterClientOptions = Omit< RedisClientOptions, 'modules' | 'functions' | 'scripts' | 'database' | 'RESP' >; export interface RedisClusterOptions< M extends RedisModules = RedisModules, F extends RedisFunctions = RedisFunctions, S extends RedisScripts = RedisScripts, RESP extends RespVersions = RespVersions > extends CommanderConfig { /** * Should contain details for some of the cluster nodes that the client will use to discover * the "cluster topology". We recommend including details for at least 3 nodes here. */ rootNodes: Array; /** * Default values used for every client in the cluster. Use this to specify global values, * for example: ACL credentials, timeouts, TLS configuration etc. */ defaults?: Partial; /** * When `true`, `.connect()` will only discover the cluster topology, without actually connecting to all the nodes. * Useful for short-term or PubSub-only connections. */ minimizeConnections?: boolean; /** * When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes. */ // TODO: replicas only mode? useReplicas?: boolean; /** * The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors. */ maxCommandRedirections?: number; /** * Mapping between the addresses in the cluster (see `CLUSTER SHARDS`) and the addresses the client should connect to * Useful when the cluster is running on another network */ nodeAddressMap?: NodeAddressMap; } type WithCommands< RESP extends RespVersions, FLAGS extends Flags, POLICIES extends CommandPolicies > = { [P in keyof typeof COMMANDS]: CommandWithPoliciesSignature<(typeof COMMANDS)[P], RESP, FLAGS, POLICIES>; }; export type RedisClusterType< M extends RedisModules = {}, F extends RedisFunctions = {}, S extends RedisScripts = {}, RESP extends RespVersions = 2, FLAGS extends Flags = {}, POLICIES extends CommandPolicies = {} > = RedisCluster & WithCommands; // & WithModules & WithFunctions & WithScripts export interface ClusterCommandOptions extends ClientCommandOptions { policies?: CommandPolicies; } type ProxyCluster = RedisCluster & { commandOptions?: ClusterCommandOptions }; type NamespaceProxyCluster = { self: ProxyCluster }; export default class RedisCluster< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts, RESP extends RespVersions, FLAGS extends Flags, POLICIES extends CommandPolicies > extends EventEmitter { private static _extractFirstKey( command: C, args: Parameters, redisArgs: Array ): RedisArgument | undefined { if (command.FIRST_KEY_INDEX === undefined) { return undefined; } else if (typeof command.FIRST_KEY_INDEX === 'number') { return redisArgs[command.FIRST_KEY_INDEX]; } return command.FIRST_KEY_INDEX(...args); } private static _createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: ProxyCluster) { const args = command.transformArguments.apply(undefined, arguments as any), firstKey = RedisCluster._extractFirstKey( command, arguments as any, args ), reply = await this.sendCommand( firstKey, command.IS_READ_ONLY, args, this.commandOptions, command.POLICIES ); return transformReply ? transformReply(reply, args.preserve) : reply; }; } private static _createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: NamespaceProxyCluster) { const args = command.transformArguments.apply(undefined, arguments as any), firstKey = RedisCluster._extractFirstKey( command, arguments as any, args ), reply = await this.self.sendCommand( firstKey, command.IS_READ_ONLY, args, this.self.commandOptions, command.POLICIES ); return transformReply ? transformReply(reply, args.preserve) : reply; }; } private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); return async function (this: NamespaceProxyCluster) { const fnArgs = fn.transformArguments.apply(undefined, arguments as any), args = prefix.concat(fnArgs), firstKey = RedisCluster._extractFirstKey( fn, arguments as any, args ), reply = await this.self.sendCommand( firstKey, fn.IS_READ_ONLY, args, this.self.commandOptions, fn.POLICIES ); return transformReply ? transformReply(reply, fnArgs.preserve) : reply; }; } private static _createScriptCommand(script: RedisScript, resp: RespVersions) { const prefix = scriptArgumentsPrefix(script), transformReply = getTransformReply(script, resp); return async function (this: ProxyCluster) { const scriptArgs = script.transformArguments.apply(undefined, arguments as any), args = prefix.concat(scriptArgs), firstKey = RedisCluster._extractFirstKey( script, arguments as any, args ), reply = await this.sendCommand( firstKey, script.IS_READ_ONLY, args, this.commandOptions, script.POLICIES ); return transformReply ? transformReply(reply, scriptArgs.preserve) : reply; }; } static factory< M extends RedisModules = {}, F extends RedisFunctions = {}, S extends RedisScripts = {}, RESP extends RespVersions = 2 >(config?: CommanderConfig) { const Cluster = attachConfig({ BaseClass: RedisCluster, commands: COMMANDS, createCommand: RedisCluster._createCommand, createFunctionCommand: RedisCluster._createFunctionCommand, createModuleCommand: RedisCluster._createModuleCommand, createScriptCommand: RedisCluster._createScriptCommand, config }); // Client.prototype.Multi = RedisClientMultiCommand.extend(config); return (options?: Omit>) => { // returning a proxy of the client to prevent the namespaces.self to leak between proxies // namespaces will be bootstraped on first access per proxy return Object.create(new Cluster(options)) as RedisClusterType; }; } static create< M extends RedisModules = {}, F extends RedisFunctions = {}, S extends RedisScripts = {}, RESP extends RespVersions = 2 >(options?: RedisClusterOptions) { return RedisCluster.factory(options)(options); } private readonly _options: RedisClusterOptions; private readonly _slots: RedisClusterSlots; /** * An array of the cluster slots, each slot contain its `master` and `replicas`. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica). */ get slots() { return this._slots.slots; } /** * An array of cluster shards, each shard contain its `master` and `replicas`. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica). */ get shards() { return this._slots.shards; } /** * An array of the cluster masters. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node. */ get masters() { return this._slots.masters; } /** * An array of the cluster replicas. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific replica node. */ get replicas() { return this._slots.replicas; } /** * A map form a node address (`:`) to its shard, each shard contain its `master` and `replicas`. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica). */ get nodeByAddress() { return this._slots.nodeByAddress; } /** * The current pub/sub node. */ get pubSubNode() { return this._slots.pubSubNode; } // readonly #Multi: InstantiableRedisClusterMultiCommandType; get isOpen() { return this._slots.isOpen; } constructor(options: RedisClusterOptions) { super(); this._options = options; this._slots = new RedisClusterSlots(options, this.emit.bind(this)); // this.#Multi = RedisClusterMultiCommand.extend(options); } duplicate(overrides?: Partial>): RedisClusterType { return new (Object.getPrototypeOf(this).constructor)({ ...this._options, ...overrides }); } connect() { return this._slots.connect(); } withCommandOptions(options: T) { const proxy = Object.create(this); proxy.commandOptions = options; return proxy as RedisClusterType< M, F, S, RESP, T['flags'] extends Flags ? T['flags'] : {}, T['policies'] extends CommandPolicies ? T['policies'] : {} >; } private _commandOptionsProxy< K extends keyof ClusterCommandOptions, V extends ClusterCommandOptions[K] >( key: K, value: V ) { const proxy = Object.create(this); proxy.commandOptions = Object.create((this as unknown as ProxyCluster).commandOptions ?? null); proxy.commandOptions[key] = value; return proxy as RedisClusterType< M, F, S, RESP, K extends 'flags' ? V extends Flags ? V : {} : FLAGS, K extends 'policies' ? V extends CommandPolicies ? V : {} : POLICIES >; } /** * Override the `flags` command option */ withFlags(flags: FLAGS) { return this._commandOptionsProxy('flags', flags); } /** * Override the `policies` command option * TODO */ withPolicies (policies: POLICIES) { return this._commandOptionsProxy('policies', policies); } async sendCommand( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, args: CommandArguments, options?: ClusterCommandOptions, deafultPolicies?: CommandPolicies ): Promise { // const requestPolicy = options?.policies?.request ?? deafultPolicies?.request, // responsePolicy = options?.policies?.response ?? deafultPolicies?.response; const maxCommandRedirections = this._options.maxCommandRedirections ?? 16; let client = await this._slots.getClient(firstKey, isReadonly); for (let i = 0; ; i++) { try { return await client.sendCommand(args, options); } catch (err) { // TODO: error class if (++i > maxCommandRedirections || !(err instanceof Error)) { throw err; } if (err.message.startsWith('ASK')) { const address = err.message.substring(err.message.lastIndexOf(' ') + 1); let redirectTo = await this._slots.getMasterByAddress(address); if (!redirectTo) { await this._slots.rediscover(client); redirectTo = await this._slots.getMasterByAddress(address); } if (!redirectTo) { throw new Error(`Cannot find node ${address}`); } await redirectTo.asking(); client = redirectTo; continue; } else if (err.message.startsWith('MOVED')) { await this._slots.rediscover(client); client = await this._slots.getClient(firstKey, isReadonly); continue; } throw err; } } } // MULTI(routing?: RedisCommandArgument): RedisClusterMultiCommandType { // return new this.#Multi( // (commands: Array, firstKey?: RedisCommandArgument, chainId?: symbol) => { // return this.#execute( // firstKey, // false, // client => client.multiExecutor(commands, undefined, chainId) // ); // }, // routing // ); // } // multi = this.MULTI; async SUBSCRIBE( channels: string | Array, listener: PubSubListener, bufferMode?: T ) { return (await this._slots.getPubSubClient()) .SUBSCRIBE(channels, listener, bufferMode); } subscribe = this.SUBSCRIBE; async UNSUBSCRIBE( channels?: string | Array, listener?: PubSubListener, bufferMode?: T ) { return this._slots.executeUnsubscribeCommand(client => client.UNSUBSCRIBE(channels, listener, bufferMode) ); } unsubscribe = this.UNSUBSCRIBE; async PSUBSCRIBE( patterns: string | Array, listener: PubSubListener, bufferMode?: T ) { return (await this._slots.getPubSubClient()) .PSUBSCRIBE(patterns, listener, bufferMode); } pSubscribe = this.PSUBSCRIBE; async PUNSUBSCRIBE( patterns?: string | Array, listener?: PubSubListener, bufferMode?: T ) { return this._slots.executeUnsubscribeCommand(client => client.PUNSUBSCRIBE(patterns, listener, bufferMode) ); } pUnsubscribe = this.PUNSUBSCRIBE; async SSUBSCRIBE( channels: string | Array, listener: PubSubListener, bufferMode?: T ) { const maxCommandRedirections = this._options.maxCommandRedirections ?? 16, firstChannel = Array.isArray(channels) ? channels[0] : channels; let client = await this._slots.getShardedPubSubClient(firstChannel); for (let i = 0; ; i++) { try { return await client.SSUBSCRIBE(channels, listener, bufferMode); } catch (err) { if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) { throw err; } if (err.message.startsWith('MOVED')) { await this._slots.rediscover(client); client = await this._slots.getShardedPubSubClient(firstChannel); continue; } throw err; } } } sSubscribe = this.SSUBSCRIBE; SUNSUBSCRIBE( channels: string | Array, listener: PubSubListener, bufferMode?: T ) { return this._slots.executeShardedUnsubscribeCommand( Array.isArray(channels) ? channels[0] : channels, client => client.SUNSUBSCRIBE(channels, listener, bufferMode) ); } sUnsubscribe = this.SUNSUBSCRIBE; // quit(): Promise { // return this.#slots.quit(); // } disconnect(): Promise { return this._slots.disconnect(); } nodeClient(node: ShardNode) { return this._slots.nodeClient(node); } /** * Returns a random node from the cluster. * Userful for running "forward" commands (like PUBLISH) on a random node. */ getRandomNode() { return this._slots.getRandomNode(); } /** * Get a random node from a slot. * Useful for running readonly commands on a slot. */ getSlotRandomNode(slot: number) { return this._slots.getSlotRandomNode(slot); } /** * @deprecated use `.masters` instead * TODO */ getMasters() { return this.masters; } /** * @deprecated use `.slots[]` instead * TODO */ getSlotMaster(slot: number) { return this.slots[slot].master; } }