import RedisClient, { InstantiableRedisClient, RedisClientType } from '../client'; import { RedisClusterMasterNode, RedisClusterReplicaNode } from '../commands/CLUSTER_NODES'; import { RedisClusterClientOptions, RedisClusterOptions } from '.'; import { RedisCommandArgument, RedisModules, RedisScripts } from '../commands'; import { RootNodesUnavailableError } from '../errors'; // We need to use 'require', because it's not possible with Typescript to import // function that are exported as 'module.exports = function`, without esModuleInterop // set to true. const calculateSlot = require('cluster-key-slot'); export interface ClusterNode { id: string; client: RedisClientType; } interface NodeAddress { host: string; port: number; } export type NodeAddressMap = { [address: string]: NodeAddress; } | ((address: string) => NodeAddress | undefined); interface SlotNodes { master: ClusterNode; replicas: Array>; clientIterator: IterableIterator> | undefined; } type OnError = (err: unknown) => void; export default class RedisClusterSlots { readonly #options: RedisClusterOptions; readonly #Client: InstantiableRedisClient; readonly #onError: OnError; readonly #nodeByAddress = new Map>(); readonly #slots: Array> = []; constructor(options: RedisClusterOptions, onError: OnError) { this.#options = options; this.#Client = RedisClient.extend(options); this.#onError = onError; } async connect(): Promise { for (const rootNode of this.#options.rootNodes) { if (await this.#discoverNodes(rootNode)) return; } throw new RootNodesUnavailableError(); } async #discoverNodes(clientOptions?: RedisClusterClientOptions): Promise { const client = this.#initiateClient(clientOptions); await client.connect(); try { await this.#reset(await client.clusterNodes()); return true; } catch (err) { this.#onError(err); return false; } finally { if (client.isOpen) { await client.disconnect(); } } } #runningRediscoverPromise?: Promise; async rediscover(startWith: RedisClientType): Promise { if (!this.#runningRediscoverPromise) { this.#runningRediscoverPromise = this.#rediscover(startWith) .finally(() => this.#runningRediscoverPromise = undefined); } return this.#runningRediscoverPromise; } async #rediscover(startWith: RedisClientType): Promise { if (await this.#discoverNodes(startWith.options)) return; for (const { client } of this.#nodeByAddress.values()) { if (client === startWith) continue; if (await this.#discoverNodes(client.options)) return; } throw new Error('None of the cluster nodes is available'); } async #reset(masters: Array): Promise { // Override this.#slots and add not existing clients to this.#nodeByAddress const promises: Array> = [], clientsInUse = new Set(); for (const master of masters) { const slot = { master: this.#initiateClientForNode(master, false, clientsInUse, promises), replicas: this.#options.useReplicas ? master.replicas.map(replica => this.#initiateClientForNode(replica, true, clientsInUse, promises)) : [], clientIterator: undefined // will be initiated in use }; for (const { from, to } of master.slots) { for (let i = from; i <= to; i++) { this.#slots[i] = slot; } } } // Remove unused clients from this.#nodeByAddress using clientsInUse for (const [address, { client }] of this.#nodeByAddress.entries()) { if (clientsInUse.has(address)) continue; promises.push(client.disconnect()); this.#nodeByAddress.delete(address); } await Promise.all(promises); } #clientOptionsDefaults(options?: RedisClusterClientOptions): RedisClusterClientOptions | undefined { if (!this.#options.defaults) return options; return { ...this.#options.defaults, ...options, socket: this.#options.defaults.socket && options?.socket ? { ...this.#options.defaults.socket, ...options.socket } : this.#options.defaults.socket ?? options?.socket }; } #initiateClient(options?: RedisClusterClientOptions): RedisClientType { return new this.#Client(this.#clientOptionsDefaults(options)) .on('error', this.#onError); } #getNodeAddress(address: string): NodeAddress | undefined { switch (typeof this.#options.nodeAddressMap) { case 'object': return this.#options.nodeAddressMap[address]; case 'function': return this.#options.nodeAddressMap(address); } } #initiateClientForNode(nodeData: RedisClusterMasterNode | RedisClusterReplicaNode, readonly: boolean, clientsInUse: Set, promises: Array>): ClusterNode { const address = `${nodeData.host}:${nodeData.port}`; clientsInUse.add(address); let node = this.#nodeByAddress.get(address); if (!node) { node = { id: nodeData.id, client: this.#initiateClient({ socket: this.#getNodeAddress(address) ?? { host: nodeData.host, port: nodeData.port }, readonly }) }; promises.push(node.client.connect()); this.#nodeByAddress.set(address, node); } return node; } getSlotMaster(slot: number): ClusterNode { return this.#slots[slot].master; } *#slotClientIterator(slotNumber: number): IterableIterator> { const slot = this.#slots[slotNumber]; yield slot.master.client; for (const replica of slot.replicas) { yield replica.client; } } #getSlotClient(slotNumber: number): RedisClientType { const slot = this.#slots[slotNumber]; if (!slot.clientIterator) { slot.clientIterator = this.#slotClientIterator(slotNumber); } const {done, value} = slot.clientIterator.next(); if (done) { slot.clientIterator = undefined; return this.#getSlotClient(slotNumber); } return value; } #randomClientIterator?: IterableIterator>; #getRandomClient(): RedisClientType { if (!this.#nodeByAddress.size) { throw new Error('Cluster is not connected'); } if (!this.#randomClientIterator) { this.#randomClientIterator = this.#nodeByAddress.values(); } const {done, value} = this.#randomClientIterator.next(); if (done) { this.#randomClientIterator = undefined; return this.#getRandomClient(); } return value.client; } getClient(firstKey?: RedisCommandArgument, isReadonly?: boolean): RedisClientType { if (!firstKey) { return this.#getRandomClient(); } const slot = calculateSlot(firstKey); if (!isReadonly || !this.#options.useReplicas) { return this.getSlotMaster(slot).client; } return this.#getSlotClient(slot); } getMasters(): Array> { const masters = []; for (const node of this.#nodeByAddress.values()) { if (node.client.options?.readonly) continue; masters.push(node); } return masters; } getNodeByAddress(address: string): ClusterNode | undefined { const mappedAddress = this.#getNodeAddress(address); return this.#nodeByAddress.get( mappedAddress ? `${mappedAddress.host}:${mappedAddress.port}` : address ); } quit(): Promise { return this.#destroy(client => client.quit()); } disconnect(): Promise { return this.#destroy(client => client.disconnect()); } async #destroy(fn: (client: RedisClientType) => Promise): Promise { const promises = []; for (const { client } of this.#nodeByAddress.values()) { promises.push(fn(client)); } await Promise.all(promises); this.#nodeByAddress.clear(); this.#slots.splice(0); } }