From 42e36dfbb1b342bb46a2d72f7cfaae47866aaa9c Mon Sep 17 00:00:00 2001 From: leibale Date: Mon, 22 Nov 2021 17:42:41 -0500 Subject: [PATCH] enhance cluster reshard handling --- packages/client/lib/cluster/cluster-slots.ts | 62 ++++++++++++------- packages/client/lib/cluster/index.ts | 4 +- .../client/lib/commands/CLUSTER_NODES.spec.ts | 25 ++++++++ packages/client/lib/commands/CLUSTER_NODES.ts | 13 +++- 4 files changed, 76 insertions(+), 28 deletions(-) diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index ff4c79b4d3..f69449efa1 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -42,20 +42,8 @@ export default class RedisClusterSlots): Promise { - if (await this.#discoverNodes(startWith.options)) return; - - for (const { client } of this.#nodeByUrl.values()) { - if (client === startWith) continue; - - if (await this.#discoverNodes(client.options)) return; - } - - throw new Error('None of the cluster nodes is available'); - } - async #discoverNodes(clientOptions?: RedisClusterClientOptions): Promise { - const client = new this.#Client(clientOptions); + const client = this.#initiateClient(clientOptions); await client.connect(); @@ -72,6 +60,29 @@ export default class RedisClusterSlots; + + async rediscover(startWith: RedisClientType): Promise { + if (!this.#runningRediscoverPromise) { + this.#runningRediscoverPromise = this.#rediscover(startWith) + .finally(() => this.#runningRediscoverPromise = undefined); + } + + return this.#runningRediscoverPromise; + } + + async #rediscover(startWith: RedisClientType): Promise { + if (await this.#discoverNodes(startWith.options)) return; + + for (const { client } of this.#nodeByUrl.values()) { + if (client === startWith) continue; + + if (await this.#discoverNodes(client.options)) return; + } + + throw new Error('None of the cluster nodes is available'); + } + async #reset(masters: Array): Promise { // Override this.#slots and add not existing clients to this.#nodeByUrl const promises: Array> = [], @@ -103,18 +114,23 @@ export default class RedisClusterSlots { + return new this.#Client(this.#clientOptionsDefaults(options)) + .on('error', this.#onError); + } + #initiateClientForNode(nodeData: RedisClusterMasterNode | RedisClusterReplicaNode, readonly: boolean, clientsInUse: Set, promises: Array>): ClusterNode { const url = `${nodeData.host}:${nodeData.port}`; clientsInUse.add(url); @@ -123,15 +139,13 @@ export default class RedisClusterSlots const url = err.message.substring(err.message.lastIndexOf(' ') + 1); let node = this.#slots.getNodeByUrl(url); if (!node) { - await this.#slots.discover(client); + await this.#slots.rediscover(client); node = this.#slots.getNodeByUrl(url); if (!node) { @@ -168,7 +168,7 @@ export default class RedisCluster await node.client.asking(); return node.client; } else if (err.message.startsWith('MOVED')) { - await this.#slots.discover(client); + await this.#slots.rediscover(client); return true; } diff --git a/packages/client/lib/commands/CLUSTER_NODES.spec.ts b/packages/client/lib/commands/CLUSTER_NODES.spec.ts index 2b3881d8cd..d061c59e8e 100644 --- a/packages/client/lib/commands/CLUSTER_NODES.spec.ts +++ b/packages/client/lib/commands/CLUSTER_NODES.spec.ts @@ -48,6 +48,31 @@ describe('CLUSTER NODES', () => { ); }); + it('should support urls without cport', () => { + assert.deepEqual( + transformReply( + 'id 127.0.0.1:30001 master - 0 0 0 connected 0-16384\n' + ), + [{ + id: 'id', + url: '127.0.0.1:30001', + host: '127.0.0.1', + port: 30001, + cport: null, + flags: ['master'], + pingSent: 0, + pongRecv: 0, + configEpoch: 0, + linkState: RedisClusterNodeLinkStates.CONNECTED, + slots: [{ + from: 0, + to: 16384 + }], + replicas: [] + }] + ); + }); + it.skip('with importing slots', () => { assert.deepEqual( transformReply( diff --git a/packages/client/lib/commands/CLUSTER_NODES.ts b/packages/client/lib/commands/CLUSTER_NODES.ts index d04ffc10a1..ba4477cdd2 100644 --- a/packages/client/lib/commands/CLUSTER_NODES.ts +++ b/packages/client/lib/commands/CLUSTER_NODES.ts @@ -10,7 +10,7 @@ export enum RedisClusterNodeLinkStates { interface RedisClusterNodeTransformedUrl { host: string; port: number; - cport: number; + cport: number | null; } export interface RedisClusterReplicaNode extends RedisClusterNodeTransformedUrl { @@ -86,7 +86,16 @@ export function transformReply(reply: string): Array { function transformNodeUrl(url: string): RedisClusterNodeTransformedUrl { const indexOfColon = url.indexOf(':'), - indexOfAt = url.indexOf('@', indexOfColon); + indexOfAt = url.indexOf('@', indexOfColon), + host = url.substring(0, indexOfColon); + + if (indexOfAt === -1) { + return { + host, + port: Number(url.substring(indexOfColon + 1)), + cport: null + }; + } return { host: url.substring(0, indexOfColon),