import calculateSlot from 'cluster-key-slot'; import RedisClient, { RedisClientType } from './client'; import { RedisSocketOptions } from './socket'; import { RedisClusterMasterNode, RedisClusterReplicaNode } from './commands/CLUSTER_NODES'; import { RedisClusterClientOptions, RedisClusterOptions } from './cluster'; import { RedisModules } from './commands'; import { RedisLuaScripts } from './lua-script'; export interface ClusterNode { id: string; client: RedisClientType; } interface SlotNodes { master: ClusterNode; replicas: Array>; clientIterator: IterableIterator> | undefined; } type OnError = (err: unknown) => void; export default class RedisClusterSlots { readonly #options: RedisClusterOptions; readonly #onError: OnError; readonly #nodeByUrl = new Map>(); readonly #slots: Array> = []; constructor(options: RedisClusterOptions, onError: OnError) { this.#options = options; this.#onError = onError; } async connect(): Promise { for (const rootNode of this.#options.rootNodes) { if (await this.#discoverNodes(rootNode)) return; } throw new Error('None of the root nodes is available'); } async discover(startWith: RedisClientType): Promise { if (await this.#discoverNodes(startWith.options)) return; for (const { client } of this.#nodeByUrl.values()) { if (client === startWith) continue; if (await this.#discoverNodes(client.options)) return; } throw new Error('None of the cluster nodes is available'); } async #discoverNodes(clientOptions?: RedisClusterClientOptions): Promise { const client = RedisClient.create(clientOptions); await client.connect(); try { await this.#reset(await client.clusterNodes()); return true; } catch (err) { this.#onError(err); return false; } finally { if (client.isOpen) { await client.disconnect(); } } } async #reset(masters: Array): Promise { // Override this.#slots and add not existing clients to this.#clientByKey const promises: Array> = [], clientsInUse = new Set(); for (const master of masters) { const slot = { master: this.#initiateClientForNode(master, false, clientsInUse, promises), replicas: this.#options.useReplicas ? master.replicas.map(replica => this.#initiateClientForNode(replica, true, clientsInUse, promises)) : [], clientIterator: undefined // will be initiated in use }; for (const { from, to } of master.slots) { for (let i = from; i < to; i++) { this.#slots[i] = slot; } } } // Remove unused clients from this.#clientBykey using clientsInUse for (const [url, { client }] of this.#nodeByUrl.entries()) { if (clientsInUse.has(url)) continue; promises.push(client.disconnect()); this.#nodeByUrl.delete(url); } await Promise.all(promises); } #initiateClientForNode(nodeData: RedisClusterMasterNode | RedisClusterReplicaNode, readonly: boolean, clientsInUse: Set, promises: Array>): ClusterNode { const url = `${nodeData.host}:${nodeData.port}`; clientsInUse.add(url); let node = this.#nodeByUrl.get(url); if (!node) { node = { id: nodeData.id, client: RedisClient.create({ socket: { host: nodeData.host, port: nodeData.port }, readonly }) }; promises.push(node.client.connect()); this.#nodeByUrl.set(url, node); } return node; } getSlotMaster(slot: number): ClusterNode { return this.#slots[slot].master; } *#slotClientIterator(slotNumber: number): IterableIterator> { const slot = this.#slots[slotNumber]; yield slot.master.client; for (const replica of slot.replicas) { yield replica.client; } } #getSlotClient(slotNumber: number): RedisClientType { const slot = this.#slots[slotNumber]; if (!slot.clientIterator) { slot.clientIterator = this.#slotClientIterator(slotNumber); } const {done, value} = slot.clientIterator.next(); if (done) { slot.clientIterator = undefined; return this.#getSlotClient(slotNumber); } return value; } #randomClientIterator?: IterableIterator>; #getRandomClient(): RedisClientType { if (!this.#nodeByUrl.size) { throw new Error('Cluster is not connected'); } if (!this.#randomClientIterator) { this.#randomClientIterator = this.#nodeByUrl.values(); } const {done, value} = this.#randomClientIterator.next(); if (done) { this.#randomClientIterator = undefined; return this.#getRandomClient(); } return value.client; } getClient(firstKey?: string | Buffer, isReadonly?: boolean): RedisClientType { if (!firstKey) { return this.#getRandomClient(); } const slot = calculateSlot(firstKey); if (!isReadonly || !this.#options.useReplicas) { return this.getSlotMaster(slot).client; } return this.#getSlotClient(slot); } getMasters(): Array> { const masters = []; for (const node of this.#nodeByUrl.values()) { if (node.client.options?.readonly) continue; masters.push(node); } return masters; } getNodeByUrl(url: string): ClusterNode | undefined { return this.#nodeByUrl.get(url); } async disconnect(): Promise { await Promise.all( [...this.#nodeByUrl.values()].map(({ client }) => client.disconnect()) ); this.#nodeByUrl.clear(); this.#slots.splice(0); } }