1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

add nodeAddressMap config for cluster (#1827)

* add `nodeAddressMap` config for cluster

* Update cluster-slots.ts

* Update cluster-slots.ts

* update docs

Co-authored-by: Guy Royse <guy@guyroyse.com>

Co-authored-by: Guy Royse <guy@guyroyse.com>
This commit is contained in:
Leibale Eidelman
2022-02-14 15:23:35 -05:00
committed by GitHub
parent 6dd15d96aa
commit 0803f4e19c
5 changed files with 101 additions and 55 deletions

View File

@@ -14,6 +14,15 @@ export interface ClusterNode<M extends RedisModules, S extends RedisScripts> {
client: RedisClientType<M, S>;
}
interface NodeAddress {
host: string;
port: number;
}
export type NodeAddressMap = {
[address: string]: NodeAddress;
} | ((address: string) => NodeAddress | undefined);
interface SlotNodes<M extends RedisModules, S extends RedisScripts> {
master: ClusterNode<M, S>;
replicas: Array<ClusterNode<M, S>>;
@@ -26,7 +35,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
readonly #options: RedisClusterOptions<M, S>;
readonly #Client: InstantiableRedisClient<M, S>;
readonly #onError: OnError;
readonly #nodeByUrl = new Map<string, ClusterNode<M, S>>();
readonly #nodeByAddress = new Map<string, ClusterNode<M, S>>();
readonly #slots: Array<SlotNodes<M, S>> = [];
constructor(options: RedisClusterOptions<M, S>, onError: OnError) {
@@ -37,7 +46,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
async connect(): Promise<void> {
for (const rootNode of this.#options.rootNodes) {
if (await this.#discoverNodes(this.#clientOptionsDefaults(rootNode))) return;
if (await this.#discoverNodes(rootNode)) return;
}
throw new RootNodesUnavailableError();
@@ -75,7 +84,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
async #rediscover(startWith: RedisClientType<M, S>): Promise<void> {
if (await this.#discoverNodes(startWith.options)) return;
for (const { client } of this.#nodeByUrl.values()) {
for (const { client } of this.#nodeByAddress.values()) {
if (client === startWith) continue;
if (await this.#discoverNodes(client.options)) return;
@@ -85,7 +94,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
}
async #reset(masters: Array<RedisClusterMasterNode>): Promise<void> {
// Override this.#slots and add not existing clients to this.#nodeByUrl
// Override this.#slots and add not existing clients to this.#nodeByAddress
const promises: Array<Promise<void>> = [],
clientsInUse = new Set<string>();
for (const master of masters) {
@@ -94,7 +103,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
replicas: this.#options.useReplicas ?
master.replicas.map(replica => this.#initiateClientForNode(replica, true, clientsInUse, promises)) :
[],
clientIterator: undefined // will be initiated in use
clientIterator: undefined // will be initiated in use
};
for (const { from, to } of master.slots) {
@@ -104,12 +113,12 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
}
}
// Remove unused clients from this.#nodeByUrl using clientsInUse
for (const [url, { client }] of this.#nodeByUrl.entries()) {
if (clientsInUse.has(url)) continue;
// Remove unused clients from this.#nodeByAddress using clientsInUse
for (const [address, { client }] of this.#nodeByAddress.entries()) {
if (clientsInUse.has(address)) continue;
promises.push(client.disconnect());
this.#nodeByUrl.delete(url);
this.#nodeByAddress.delete(address);
}
await Promise.all(promises);
@@ -118,13 +127,14 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
#clientOptionsDefaults(options?: RedisClusterClientOptions): RedisClusterClientOptions | undefined {
if (!this.#options.defaults) return options;
const merged = Object.assign({}, this.#options.defaults, options);
if (options?.socket && this.#options.defaults.socket) {
Object.assign({}, this.#options.defaults.socket, options.socket);
}
return merged;
return {
...this.#options.defaults,
...options,
socket: this.#options.defaults.socket && options?.socket ? {
...this.#options.defaults.socket,
...options.socket
} : this.#options.defaults.socket ?? options?.socket
};
}
#initiateClient(options?: RedisClusterClientOptions): RedisClientType<M, S> {
@@ -132,16 +142,26 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
.on('error', this.#onError);
}
#initiateClientForNode(nodeData: RedisClusterMasterNode | RedisClusterReplicaNode, readonly: boolean, clientsInUse: Set<string>, promises: Array<Promise<void>>): ClusterNode<M, S> {
const url = `${nodeData.host}:${nodeData.port}`;
clientsInUse.add(url);
#getNodeAddress(address: string): NodeAddress | undefined {
switch (typeof this.#options.nodeAddressMap) {
case 'object':
return this.#options.nodeAddressMap[address];
let node = this.#nodeByUrl.get(url);
case 'function':
return this.#options.nodeAddressMap(address);
}
}
#initiateClientForNode(nodeData: RedisClusterMasterNode | RedisClusterReplicaNode, readonly: boolean, clientsInUse: Set<string>, promises: Array<Promise<void>>): ClusterNode<M, S> {
const address = `${nodeData.host}:${nodeData.port}`;
clientsInUse.add(address);
let node = this.#nodeByAddress.get(address);
if (!node) {
node = {
id: nodeData.id,
client: this.#initiateClient({
socket: {
socket: this.#getNodeAddress(address) ?? {
host: nodeData.host,
port: nodeData.port
},
@@ -149,7 +169,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
})
};
promises.push(node.client.connect());
this.#nodeByUrl.set(url, node);
this.#nodeByAddress.set(address, node);
}
return node;
@@ -186,12 +206,12 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
#randomClientIterator?: IterableIterator<ClusterNode<M, S>>;
#getRandomClient(): RedisClientType<M, S> {
if (!this.#nodeByUrl.size) {
if (!this.#nodeByAddress.size) {
throw new Error('Cluster is not connected');
}
if (!this.#randomClientIterator) {
this.#randomClientIterator = this.#nodeByUrl.values();
this.#randomClientIterator = this.#nodeByAddress.values();
}
const {done, value} = this.#randomClientIterator.next();
@@ -218,8 +238,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
getMasters(): Array<ClusterNode<M, S>> {
const masters = [];
for (const node of this.#nodeByUrl.values()) {
for (const node of this.#nodeByAddress.values()) {
if (node.client.options?.readonly) continue;
masters.push(node);
@@ -228,8 +247,11 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
return masters;
}
getNodeByUrl(url: string): ClusterNode<M, S> | undefined {
return this.#nodeByUrl.get(url);
getNodeByAddress(address: string): ClusterNode<M, S> | undefined {
const mappedAddress = this.#getNodeAddress(address);
return this.#nodeByAddress.get(
mappedAddress ? `${mappedAddress.host}:${mappedAddress.port}` : address
);
}
quit(): Promise<void> {
@@ -242,13 +264,13 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
async #destroy(fn: (client: RedisClientType<M, S>) => Promise<unknown>): Promise<void> {
const promises = [];
for (const { client } of this.#nodeByUrl.values()) {
for (const { client } of this.#nodeByAddress.values()) {
promises.push(fn(client));
}
await Promise.all(promises);
this.#nodeByUrl.clear();
this.#nodeByAddress.clear();
this.#slots.splice(0);
}
}

View File

@@ -1,7 +1,7 @@
import COMMANDS from './commands';
import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands';
import { ClientCommandOptions, RedisClientCommandSignature, RedisClientOptions, RedisClientType, WithModules, WithScripts } from '../client';
import RedisClusterSlots, { ClusterNode } from './cluster-slots';
import RedisClusterSlots, { ClusterNode, NodeAddressMap } from './cluster-slots';
import { extendWithModulesAndScripts, transformCommandArguments, transformCommandReply, extendWithCommands } from '../commander';
import { EventEmitter } from 'events';
import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command';
@@ -17,6 +17,7 @@ export interface RedisClusterOptions<
defaults?: Partial<RedisClusterClientOptions>;
useReplicas?: boolean;
maxCommandRedirections?: number;
nodeAddressMap?: NodeAddressMap;
}
type WithCommands = {
@@ -144,16 +145,16 @@ export default class RedisCluster<M extends RedisModules, S extends RedisScripts
}
if (err.message.startsWith('ASK')) {
const url = err.message.substring(err.message.lastIndexOf(' ') + 1);
if (this.#slots.getNodeByUrl(url)?.client === client) {
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
if (this.#slots.getNodeByAddress(address)?.client === client) {
await client.asking();
continue;
}
await this.#slots.rediscover(client);
const redirectTo = this.#slots.getNodeByUrl(url);
const redirectTo = this.#slots.getNodeByAddress(address);
if (!redirectTo) {
throw new Error(`Cannot find node ${url}`);
throw new Error(`Cannot find node ${address}`);
}
await redirectTo.client.asking();

View File

@@ -19,7 +19,7 @@ describe('CLUSTER NODES', () => {
].join('\n')),
[{
id: 'master',
url: '127.0.0.1:30001@31001',
address: '127.0.0.1:30001@31001',
host: '127.0.0.1',
port: 30001,
cport: 31001,
@@ -34,7 +34,7 @@ describe('CLUSTER NODES', () => {
}],
replicas: [{
id: 'slave',
url: '127.0.0.1:30002@31002',
address: '127.0.0.1:30002@31002',
host: '127.0.0.1',
port: 30002,
cport: 31002,
@@ -48,14 +48,14 @@ describe('CLUSTER NODES', () => {
);
});
it('should support urls without cport', () => {
it('should support addresses without cport', () => {
assert.deepEqual(
transformReply(
'id 127.0.0.1:30001 master - 0 0 0 connected 0-16384\n'
),
[{
id: 'id',
url: '127.0.0.1:30001',
address: '127.0.0.1:30001',
host: '127.0.0.1',
port: 30001,
cport: null,
@@ -80,7 +80,7 @@ describe('CLUSTER NODES', () => {
),
[{
id: 'id',
url: '127.0.0.1:30001@31001',
address: '127.0.0.1:30001@31001',
host: '127.0.0.1',
port: 30001,
cport: 31001,
@@ -102,7 +102,7 @@ describe('CLUSTER NODES', () => {
),
[{
id: 'id',
url: '127.0.0.1:30001@31001',
address: '127.0.0.1:30001@31001',
host: '127.0.0.1',
port: 30001,
cport: 31001,

View File

@@ -7,15 +7,15 @@ export enum RedisClusterNodeLinkStates {
DISCONNECTED = 'disconnected'
}
interface RedisClusterNodeTransformedUrl {
interface RedisClusterNodeAddress {
host: string;
port: number;
cport: number | null;
}
export interface RedisClusterReplicaNode extends RedisClusterNodeTransformedUrl {
export interface RedisClusterReplicaNode extends RedisClusterNodeAddress {
id: string;
url: string;
address: string;
flags: Array<string>;
pingSent: number;
pongRecv: number;
@@ -39,11 +39,11 @@ export function transformReply(reply: string): Array<RedisClusterMasterNode> {
replicasMap = new Map<string, Array<RedisClusterReplicaNode>>();
for (const line of lines) {
const [id, url, flags, masterId, pingSent, pongRecv, configEpoch, linkState, ...slots] = line.split(' '),
const [id, address, flags, masterId, pingSent, pongRecv, configEpoch, linkState, ...slots] = line.split(' '),
node = {
id,
url,
...transformNodeUrl(url),
address,
...transformNodeAddress(address),
flags: flags.split(','),
pingSent: Number(pingSent),
pongRecv: Number(pongRecv),
@@ -84,22 +84,22 @@ export function transformReply(reply: string): Array<RedisClusterMasterNode> {
return [...mastersMap.values()];
}
function transformNodeUrl(url: string): RedisClusterNodeTransformedUrl {
const indexOfColon = url.indexOf(':'),
indexOfAt = url.indexOf('@', indexOfColon),
host = url.substring(0, indexOfColon);
function transformNodeAddress(address: string): RedisClusterNodeAddress {
const indexOfColon = address.indexOf(':'),
indexOfAt = address.indexOf('@', indexOfColon),
host = address.substring(0, indexOfColon);
if (indexOfAt === -1) {
return {
host,
port: Number(url.substring(indexOfColon + 1)),
port: Number(address.substring(indexOfColon + 1)),
cport: null
};
}
return {
host: url.substring(0, indexOfColon),
port: Number(url.substring(indexOfColon + 1, indexOfAt)),
cport: Number(url.substring(indexOfAt + 1))
host: address.substring(0, indexOfColon),
port: Number(address.substring(indexOfColon + 1, indexOfAt)),
cport: Number(address.substring(indexOfAt + 1))
};
}