import RedisClient, { InstantiableRedisClient, RedisClientType } from '../client'; import { RedisClusterClientOptions, RedisClusterOptions } from '.'; import { RedisCommandArgument, RedisFunctions, RedisModules, RedisScripts } from '../commands'; import { RootNodesUnavailableError } from '../errors'; import { ClusterSlotsNode } from '../commands/CLUSTER_SLOTS'; import { types } from 'util'; import { ChannelListeners, PubSubType, PubSubTypeListeners } from '../client/pub-sub'; import { EventEmitter } from 'stream'; // We need to use 'require', because it's not possible with Typescript to import // function that are exported as 'module.exports = function`, without esModuleInterop // set to true. const calculateSlot = require('cluster-key-slot'); interface NodeAddress { host: string; port: number; } export type NodeAddressMap = { [address: string]: NodeAddress; } | ((address: string) => NodeAddress | undefined); type ValueOrPromise = T | Promise; type ClientOrPromise< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > = ValueOrPromise>; export interface Node< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > { address: string; client?: ClientOrPromise; } export interface ShardNode< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > extends Node { id: string; host: string; port: number; readonly: boolean; } export interface MasterNode< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > extends ShardNode { pubSubClient?: ClientOrPromise; } export interface Shard< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > { master: MasterNode; replicas?: Array>; nodesIterator?: IterableIterator>; } type ShardWithReplicas< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > = Shard & Required, 'replicas'>>; export type PubSubNode< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > = Required>; type PubSubToResubscribe = Record< PubSubType.CHANNELS | PubSubType.PATTERNS, PubSubTypeListeners >; export type OnShardedChannelMovedError = ( err: unknown, channel: string, listeners?: ChannelListeners ) => void; export default class RedisClusterSlots< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > { static #SLOTS = 16384; readonly #options: RedisClusterOptions; readonly #Client: InstantiableRedisClient; readonly #emit: EventEmitter['emit']; slots = new Array>(RedisClusterSlots.#SLOTS); shards = new Array>(); masters = new Array>(); replicas = new Array>(); readonly nodeByAddress = new Map | ShardNode>(); pubSubNode?: PubSubNode; #isOpen = false; get isOpen() { return this.#isOpen; } constructor( options: RedisClusterOptions, emit: EventEmitter['emit'] ) { this.#options = options; this.#Client = RedisClient.extend(options); this.#emit = emit; } async connect() { if (this.#isOpen) { throw new Error('Cluster already open'); } this.#isOpen = true; try { await this.#discoverWithRootNodes(); } catch (err) { this.#isOpen = false; throw err; } } async #discoverWithRootNodes() { let start = Math.floor(Math.random() * this.#options.rootNodes.length); for (let i = start; i < this.#options.rootNodes.length; i++) { if (await this.#discover(this.#options.rootNodes[i])) return; } for (let i = 0; i < start; i++) { if (await this.#discover(this.#options.rootNodes[i])) return; } throw new RootNodesUnavailableError(); } #resetSlots() { this.slots = new Array(RedisClusterSlots.#SLOTS); this.shards = []; this.masters = []; this.replicas = []; this.#randomNodeIterator = undefined; } async #discover(rootNode?: RedisClusterClientOptions) { const addressesInUse = new Set(); try { const shards = await this.#getShards(rootNode), promises: Array> = [], eagerConnect = this.#options.minimizeConnections !== true; this.#resetSlots(); for (const { from, to, master, replicas } of shards) { const shard: Shard = { master: this.#initiateSlotNode(master, false, eagerConnect, addressesInUse, promises) }; if (this.#options.useReplicas) { shard.replicas = replicas.map(replica => this.#initiateSlotNode(replica, true, eagerConnect, addressesInUse, promises) ); } this.shards.push(shard); for (let i = from; i <= to; i++) { this.slots[i] = shard; } } if (this.pubSubNode && !addressesInUse.has(this.pubSubNode.address)) { if (types.isPromise(this.pubSubNode.client)) { promises.push( this.pubSubNode.client.then(client => client.disconnect()) ); this.pubSubNode = undefined; } else { promises.push(this.pubSubNode.client.disconnect()); const channelsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.CHANNELS), patternsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.PATTERNS); if (channelsListeners.size || patternsListeners.size) { promises.push( this.#initiatePubSubClient({ [PubSubType.CHANNELS]: channelsListeners, [PubSubType.PATTERNS]: patternsListeners }) ); } } } for (const [address, node] of this.nodeByAddress.entries()) { if (addressesInUse.has(address)) continue; if (node.client) { promises.push( this.#execOnNodeClient(node.client, client => client.disconnect()) ); } const { pubSubClient } = node as MasterNode; if (pubSubClient) { promises.push( this.#execOnNodeClient(pubSubClient, client => client.disconnect()) ); } this.nodeByAddress.delete(address); } await Promise.all(promises); return true; } catch (err) { this.#emit('error', err); return false; } } async #getShards(rootNode?: RedisClusterClientOptions) { const client = new this.#Client( this.#clientOptionsDefaults(rootNode, true) ); client.on('error', err => this.#emit('error', err)); await client.connect(); try { // using `CLUSTER SLOTS` and not `CLUSTER SHARDS` to support older versions return await client.clusterSlots(); } finally { await client.disconnect(); } } #getNodeAddress(address: string): NodeAddress | undefined { switch (typeof this.#options.nodeAddressMap) { case 'object': return this.#options.nodeAddressMap[address]; case 'function': return this.#options.nodeAddressMap(address); } } #clientOptionsDefaults( options?: RedisClusterClientOptions, disableReconnect?: boolean ): RedisClusterClientOptions | undefined { let result: RedisClusterClientOptions | undefined; if (this.#options.defaults) { let socket; if (this.#options.defaults.socket) { socket = { ...this.#options.defaults.socket, ...options?.socket }; } else { socket = options?.socket; } result = { ...this.#options.defaults, ...options, socket }; } else { result = options; } if (disableReconnect) { result ??= {}; result.socket ??= {}; result.socket.reconnectStrategy = false; } return result; } #initiateSlotNode( { id, ip, port }: ClusterSlotsNode, readonly: boolean, eagerConnent: boolean, addressesInUse: Set, promises: Array> ) { const address = `${ip}:${port}`; addressesInUse.add(address); let node = this.nodeByAddress.get(address); if (!node) { node = { id, host: ip, port, address, readonly, client: undefined }; if (eagerConnent) { promises.push(this.#createNodeClient(node)); } this.nodeByAddress.set(address, node); } (readonly ? this.replicas : this.masters).push(node); return node; } async #createClient( node: ShardNode, readonly = node.readonly ) { const client = new this.#Client( this.#clientOptionsDefaults({ socket: this.#getNodeAddress(node.address) ?? { host: node.host, port: node.port }, readonly }) ); client.on('error', err => this.#emit('error', err)); await client.connect(); return client; } #createNodeClient(node: ShardNode) { const promise = this.#createClient(node) .then(client => { node.client = client; return client; }) .catch(err => { node.client = undefined; throw err; }); node.client = promise; return promise; } nodeClient(node: ShardNode) { return node.client ?? this.#createNodeClient(node); } #runningRediscoverPromise?: Promise; async rediscover(startWith: RedisClientType): Promise { this.#runningRediscoverPromise ??= this.#rediscover(startWith) .finally(() => this.#runningRediscoverPromise = undefined); return this.#runningRediscoverPromise; } async #rediscover(startWith: RedisClientType): Promise { if (await this.#discover(startWith.options)) return; return this.#discoverWithRootNodes(); } quit(): Promise { return this.#destroy(client => client.quit()); } disconnect(): Promise { return this.#destroy(client => client.disconnect()); } async #destroy(fn: (client: RedisClientType) => Promise): Promise { this.#isOpen = false; const promises = []; for (const { master, replicas } of this.shards) { if (master.client) { promises.push( this.#execOnNodeClient(master.client, fn) ); } if (master.pubSubClient) { promises.push( this.#execOnNodeClient(master.pubSubClient, fn) ); } if (replicas) { for (const { client } of replicas) { if (client) { promises.push( this.#execOnNodeClient(client, fn) ); } } } } if (this.pubSubNode) { promises.push(this.#execOnNodeClient(this.pubSubNode.client, fn)); this.pubSubNode = undefined; } this.#resetSlots(); this.nodeByAddress.clear(); await Promise.allSettled(promises); } #execOnNodeClient( client: ClientOrPromise, fn: (client: RedisClientType) => Promise ) { return types.isPromise(client) ? client.then(fn) : fn(client); } getClient( firstKey: RedisCommandArgument | undefined, isReadonly: boolean | undefined ): ClientOrPromise { if (!firstKey) { return this.nodeClient(this.getRandomNode()); } const slotNumber = calculateSlot(firstKey); if (!isReadonly) { return this.nodeClient(this.slots[slotNumber].master); } return this.nodeClient(this.getSlotRandomNode(slotNumber)); } *#iterateAllNodes() { let i = Math.floor(Math.random() * (this.masters.length + this.replicas.length)); if (i < this.masters.length) { do { yield this.masters[i]; } while (++i < this.masters.length); for (const replica of this.replicas) { yield replica; } } else { i -= this.masters.length; do { yield this.replicas[i]; } while (++i < this.replicas.length); } while (true) { for (const master of this.masters) { yield master; } for (const replica of this.replicas) { yield replica; } } } #randomNodeIterator?: IterableIterator>; getRandomNode() { this.#randomNodeIterator ??= this.#iterateAllNodes(); return this.#randomNodeIterator.next().value as ShardNode; } *#slotNodesIterator(slot: ShardWithReplicas) { let i = Math.floor(Math.random() * (1 + slot.replicas.length)); if (i < slot.replicas.length) { do { yield slot.replicas[i]; } while (++i < slot.replicas.length); } while (true) { yield slot.master; for (const replica of slot.replicas) { yield replica; } } } getSlotRandomNode(slotNumber: number) { const slot = this.slots[slotNumber]; if (!slot.replicas?.length) { return slot.master; } slot.nodesIterator ??= this.#slotNodesIterator(slot as ShardWithReplicas); return slot.nodesIterator.next().value as ShardNode; } getMasterByAddress(address: string) { const master = this.nodeByAddress.get(address); if (!master) return; return this.nodeClient(master); } getPubSubClient() { return this.pubSubNode ? this.pubSubNode.client : this.#initiatePubSubClient(); } async #initiatePubSubClient(toResubscribe?: PubSubToResubscribe) { const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)), node = index < this.masters.length ? this.masters[index] : this.replicas[index - this.masters.length]; this.pubSubNode = { address: node.address, client: this.#createClient(node, true) .then(async client => { if (toResubscribe) { await Promise.all([ client.extendPubSubListeners(PubSubType.CHANNELS, toResubscribe[PubSubType.CHANNELS]), client.extendPubSubListeners(PubSubType.PATTERNS, toResubscribe[PubSubType.PATTERNS]) ]); } this.pubSubNode!.client = client; return client; }) .catch(err => { this.pubSubNode = undefined; throw err; }) }; return this.pubSubNode.client as Promise>; } async executeUnsubscribeCommand( unsubscribe: (client: RedisClientType) => Promise ): Promise { const client = await this.getPubSubClient(); await unsubscribe(client); if (!client.isPubSubActive && client.isOpen) { await client.disconnect(); this.pubSubNode = undefined; } } getShardedPubSubClient(channel: string) { const { master } = this.slots[calculateSlot(channel)]; return master.pubSubClient ?? this.#initiateShardedPubSubClient(master); } #initiateShardedPubSubClient(master: MasterNode) { const promise = this.#createClient(master, true) .then(client => { client.on('server-sunsubscribe', async (channel, listeners) => { try { await this.rediscover(client); const redirectTo = await this.getShardedPubSubClient(channel); redirectTo.extendPubSubChannelListeners( PubSubType.SHARDED, channel, listeners ); } catch (err) { this.#emit('sharded-shannel-moved-error', err, channel, listeners); } }); master.pubSubClient = client; return client; }) .catch(err => { master.pubSubClient = undefined; throw err; }); master.pubSubClient = promise; return promise; } async executeShardedUnsubscribeCommand( channel: string, unsubscribe: (client: RedisClientType) => Promise ): Promise { const { master } = this.slots[calculateSlot(channel)]; if (!master.pubSubClient) return Promise.resolve(); const client = await master.pubSubClient; await unsubscribe(client); if (!client.isPubSubActive && client.isOpen) { await client.disconnect(); master.pubSubClient = undefined; } } }