You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
Add support for sharded PubSub (#2373)
* refactor pubsub, add support for sharded pub sub * run tests in redis 7 only, fix PUBSUB SHARDCHANNELS test * add some comments and fix some bugs * PubSubType, not PubSubTypes 🤦♂️ * remove test.txt * fix some bugs, add tests * add some tests * fix #2345 - allow PING in PubSub mode (remove client side validation) * remove .only * revert changes in cluster/index.ts * fix tests minimum version * handle server sunsubscribe * add 'sharded-channel-moved' event to docs, improve the events section in the main README (fix #2302) * exit "resubscribe" if pubsub not active * Update commands-queue.ts * Release client@1.5.0-rc.0 * WIP * use `node:util` instead of `node:util/types` (to support node 14) * run PubSub resharding test with Redis 7+ * fix inconsistency in live resharding test * add some tests * fix iterateAllNodes when starting from a replica * fix iterateAllNodes random * fix slotNodesIterator * fix slotNodesIterator * clear pubSubNode when node in use * wait for all nodes cluster state to be ok before testing * `cluster.minimizeConections` tests * `client.reconnectStrategry = false | 0` tests * sharded pubsub + cluster 🎉 * add minimum version to sharded pubsub tests * add cluster sharded pubsub live reshard test, use stable dockers for tests, make sure to close pubsub clients when a node disconnects from the cluster * fix "ssubscribe & sunsubscribe" test * lock search docker to 2.4.9 * change numberOfMasters default to 2 * use edge for bloom * add tests * add back getMasters and getSlotMaster as deprecated functions * add some tests * fix reconnect strategy + docs * sharded pubsub docs * Update pub-sub.md * some jsdoc, docs, cluster topology test * clean pub-sub docs Co-authored-by: Simon Prickett <simon@redislabs.com> * reconnect startegy docs and bug fix Co-authored-by: Simon Prickett <simon@redislabs.com> * refine jsdoc and some docs Co-authored-by: Simon Prickett <simon@redislabs.com> * I'm stupid * fix cluster topology test * fix cluster topology test * Update README.md * Update clustering.md * Update pub-sub.md Co-authored-by: Simon Prickett <simon@redislabs.com>
This commit is contained in:
@@ -1,23 +1,17 @@
|
||||
import RedisClient, { InstantiableRedisClient, RedisClientType } from '../client';
|
||||
import { RedisClusterMasterNode, RedisClusterReplicaNode } from '../commands/CLUSTER_NODES';
|
||||
import { RedisClusterClientOptions, RedisClusterOptions } from '.';
|
||||
import { RedisCommandArgument, RedisFunctions, RedisModules, RedisScripts } from '../commands';
|
||||
import { RootNodesUnavailableError } from '../errors';
|
||||
import { ClusterSlotsNode } from '../commands/CLUSTER_SLOTS';
|
||||
import { types } from 'node:util';
|
||||
import { ChannelListeners, PubSubType, PubSubTypeListeners } from '../client/pub-sub';
|
||||
import { EventEmitter } from 'node:stream';
|
||||
|
||||
// We need to use 'require', because it's not possible with Typescript to import
|
||||
// function that are exported as 'module.exports = function`, without esModuleInterop
|
||||
// set to true.
|
||||
const calculateSlot = require('cluster-key-slot');
|
||||
|
||||
export interface ClusterNode<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> {
|
||||
id: string;
|
||||
client: RedisClientType<M, F, S>;
|
||||
}
|
||||
|
||||
interface NodeAddress {
|
||||
host: string;
|
||||
port: number;
|
||||
@@ -27,133 +21,236 @@ export type NodeAddressMap = {
|
||||
[address: string]: NodeAddress;
|
||||
} | ((address: string) => NodeAddress | undefined);
|
||||
|
||||
interface SlotNodes<
|
||||
type ValueOrPromise<T> = T | Promise<T>;
|
||||
|
||||
type ClientOrPromise<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> = ValueOrPromise<RedisClientType<M, F, S>>;
|
||||
|
||||
export interface Node<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> {
|
||||
master: ClusterNode<M, F, S>;
|
||||
replicas: Array<ClusterNode<M, F, S>>;
|
||||
clientIterator: IterableIterator<RedisClientType<M, F, S>> | undefined;
|
||||
address: string;
|
||||
client?: ClientOrPromise<M, F, S>;
|
||||
}
|
||||
|
||||
type OnError = (err: unknown) => void;
|
||||
export interface ShardNode<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> extends Node<M, F, S> {
|
||||
id: string;
|
||||
host: string;
|
||||
port: number;
|
||||
readonly: boolean;
|
||||
}
|
||||
|
||||
export interface MasterNode<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> extends ShardNode<M, F, S> {
|
||||
pubSubClient?: ClientOrPromise<M, F, S>;
|
||||
}
|
||||
|
||||
export interface Shard<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> {
|
||||
master: MasterNode<M, F, S>;
|
||||
replicas?: Array<ShardNode<M, F, S>>;
|
||||
nodesIterator?: IterableIterator<ShardNode<M, F, S>>;
|
||||
}
|
||||
|
||||
type ShardWithReplicas<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> = Shard<M, F, S> & Required<Pick<Shard<M, F, S>, 'replicas'>>;
|
||||
|
||||
export type PubSubNode<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> = Required<Node<M, F, S>>;
|
||||
|
||||
type PubSubToResubscribe = Record<
|
||||
PubSubType.CHANNELS | PubSubType.PATTERNS,
|
||||
PubSubTypeListeners
|
||||
>;
|
||||
|
||||
export type OnShardedChannelMovedError = (
|
||||
err: unknown,
|
||||
channel: string,
|
||||
listeners?: ChannelListeners
|
||||
) => void;
|
||||
|
||||
export default class RedisClusterSlots<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> {
|
||||
static #SLOTS = 16384;
|
||||
|
||||
readonly #options: RedisClusterOptions<M, F, S>;
|
||||
readonly #Client: InstantiableRedisClient<M, F, S>;
|
||||
readonly #onError: OnError;
|
||||
readonly #nodeByAddress = new Map<string, ClusterNode<M, F, S>>();
|
||||
readonly #slots: Array<SlotNodes<M, F, S>> = [];
|
||||
readonly #emit: EventEmitter['emit'];
|
||||
slots = new Array<Shard<M, F, S>>(RedisClusterSlots.#SLOTS);
|
||||
shards = new Array<Shard<M, F, S>>();
|
||||
masters = new Array<ShardNode<M, F, S>>();
|
||||
replicas = new Array<ShardNode<M, F, S>>();
|
||||
readonly nodeByAddress = new Map<string, MasterNode<M, F, S> | ShardNode<M, F, S>>();
|
||||
pubSubNode?: PubSubNode<M, F, S>;
|
||||
|
||||
constructor(options: RedisClusterOptions<M, F, S>, onError: OnError) {
|
||||
this.#options = options;
|
||||
this.#Client = RedisClient.extend(options);
|
||||
this.#onError = onError;
|
||||
#isOpen = false;
|
||||
|
||||
get isOpen() {
|
||||
return this.#isOpen;
|
||||
}
|
||||
|
||||
async connect(): Promise<void> {
|
||||
for (const rootNode of this.#options.rootNodes) {
|
||||
if (await this.#discoverNodes(rootNode)) return;
|
||||
constructor(
|
||||
options: RedisClusterOptions<M, F, S>,
|
||||
emit: EventEmitter['emit']
|
||||
) {
|
||||
this.#options = options;
|
||||
this.#Client = RedisClient.extend(options);
|
||||
this.#emit = emit;
|
||||
}
|
||||
|
||||
async connect() {
|
||||
if (this.#isOpen) {
|
||||
throw new Error('Cluster already open');
|
||||
}
|
||||
|
||||
this.#isOpen = true;
|
||||
try {
|
||||
await this.#discoverWithRootNodes();
|
||||
} catch (err) {
|
||||
this.#isOpen = false;
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async #discoverWithRootNodes() {
|
||||
let start = Math.floor(Math.random() * this.#options.rootNodes.length);
|
||||
for (let i = start; i < this.#options.rootNodes.length; i++) {
|
||||
if (await this.#discover(this.#options.rootNodes[i])) return;
|
||||
}
|
||||
|
||||
for (let i = 0; i < start; i++) {
|
||||
if (await this.#discover(this.#options.rootNodes[i])) return;
|
||||
}
|
||||
|
||||
throw new RootNodesUnavailableError();
|
||||
}
|
||||
|
||||
async #discoverNodes(clientOptions?: RedisClusterClientOptions): Promise<boolean> {
|
||||
const client = this.#initiateClient(clientOptions);
|
||||
#resetSlots() {
|
||||
this.slots = new Array(RedisClusterSlots.#SLOTS);
|
||||
this.shards = [];
|
||||
this.masters = [];
|
||||
this.replicas = [];
|
||||
this.#randomNodeIterator = undefined;
|
||||
}
|
||||
|
||||
async #discover(rootNode?: RedisClusterClientOptions) {
|
||||
this.#resetSlots();
|
||||
const addressesInUse = new Set<string>();
|
||||
|
||||
try {
|
||||
const shards = await this.#getShards(rootNode),
|
||||
promises: Array<Promise<unknown>> = [],
|
||||
eagerConnect = this.#options.minimizeConnections !== true;
|
||||
for (const { from, to, master, replicas } of shards) {
|
||||
const shard: Shard<M, F, S> = {
|
||||
master: this.#initiateSlotNode(master, false, eagerConnect, addressesInUse, promises)
|
||||
};
|
||||
|
||||
if (this.#options.useReplicas) {
|
||||
shard.replicas = replicas.map(replica =>
|
||||
this.#initiateSlotNode(replica, true, eagerConnect, addressesInUse, promises)
|
||||
);
|
||||
}
|
||||
|
||||
this.shards.push(shard);
|
||||
|
||||
for (let i = from; i <= to; i++) {
|
||||
this.slots[i] = shard;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.pubSubNode && !addressesInUse.has(this.pubSubNode.address)) {
|
||||
if (types.isPromise(this.pubSubNode.client)) {
|
||||
promises.push(
|
||||
this.pubSubNode.client.then(client => client.disconnect())
|
||||
);
|
||||
this.pubSubNode = undefined;
|
||||
} else {
|
||||
promises.push(this.pubSubNode.client.disconnect());
|
||||
|
||||
const channelsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.CHANNELS),
|
||||
patternsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.PATTERNS);
|
||||
|
||||
if (channelsListeners.size || patternsListeners.size) {
|
||||
promises.push(
|
||||
this.#initiatePubSubClient({
|
||||
[PubSubType.CHANNELS]: channelsListeners,
|
||||
[PubSubType.PATTERNS]: patternsListeners
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const [address, node] of this.nodeByAddress.entries()) {
|
||||
if (addressesInUse.has(address)) continue;
|
||||
|
||||
if (node.client) {
|
||||
promises.push(
|
||||
this.#execOnNodeClient(node.client, client => client.disconnect())
|
||||
);
|
||||
}
|
||||
|
||||
const { pubSubClient } = node as MasterNode<M, F, S>;
|
||||
if (pubSubClient) {
|
||||
promises.push(
|
||||
this.#execOnNodeClient(pubSubClient, client => client.disconnect())
|
||||
);
|
||||
}
|
||||
|
||||
this.nodeByAddress.delete(address);
|
||||
}
|
||||
|
||||
await Promise.all(promises);
|
||||
|
||||
return true;
|
||||
} catch (err) {
|
||||
this.#emit('error', err);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async #getShards(rootNode?: RedisClusterClientOptions) {
|
||||
const client = new this.#Client(
|
||||
this.#clientOptionsDefaults(rootNode, true)
|
||||
);
|
||||
|
||||
client.on('error', err => this.#emit('error', err));
|
||||
|
||||
await client.connect();
|
||||
|
||||
try {
|
||||
await this.#reset(await client.clusterNodes());
|
||||
return true;
|
||||
} catch (err) {
|
||||
this.#onError(err);
|
||||
return false;
|
||||
// using `CLUSTER SLOTS` and not `CLUSTER SHARDS` to support older versions
|
||||
return await client.clusterSlots();
|
||||
} finally {
|
||||
if (client.isOpen) {
|
||||
await client.disconnect();
|
||||
}
|
||||
await client.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
#runningRediscoverPromise?: Promise<void>;
|
||||
|
||||
async rediscover(startWith: RedisClientType<M, F, S>): Promise<void> {
|
||||
if (!this.#runningRediscoverPromise) {
|
||||
this.#runningRediscoverPromise = this.#rediscover(startWith)
|
||||
.finally(() => this.#runningRediscoverPromise = undefined);
|
||||
}
|
||||
|
||||
return this.#runningRediscoverPromise;
|
||||
}
|
||||
|
||||
async #rediscover(startWith: RedisClientType<M, F, S>): Promise<void> {
|
||||
if (await this.#discoverNodes(startWith.options)) return;
|
||||
|
||||
for (const { client } of this.#nodeByAddress.values()) {
|
||||
if (client === startWith) continue;
|
||||
|
||||
if (await this.#discoverNodes(client.options)) return;
|
||||
}
|
||||
|
||||
throw new Error('None of the cluster nodes is available');
|
||||
}
|
||||
|
||||
async #reset(masters: Array<RedisClusterMasterNode>): Promise<void> {
|
||||
// Override this.#slots and add not existing clients to this.#nodeByAddress
|
||||
const promises: Array<Promise<void>> = [],
|
||||
clientsInUse = new Set<string>();
|
||||
for (const master of masters) {
|
||||
const slot = {
|
||||
master: this.#initiateClientForNode(master, false, clientsInUse, promises),
|
||||
replicas: this.#options.useReplicas ?
|
||||
master.replicas.map(replica => this.#initiateClientForNode(replica, true, clientsInUse, promises)) :
|
||||
[],
|
||||
clientIterator: undefined // will be initiated in use
|
||||
};
|
||||
|
||||
for (const { from, to } of master.slots) {
|
||||
for (let i = from; i <= to; i++) {
|
||||
this.#slots[i] = slot;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove unused clients from this.#nodeByAddress using clientsInUse
|
||||
for (const [address, { client }] of this.#nodeByAddress.entries()) {
|
||||
if (clientsInUse.has(address)) continue;
|
||||
|
||||
promises.push(client.disconnect());
|
||||
this.#nodeByAddress.delete(address);
|
||||
}
|
||||
|
||||
await Promise.all(promises);
|
||||
}
|
||||
|
||||
#clientOptionsDefaults(options?: RedisClusterClientOptions): RedisClusterClientOptions | undefined {
|
||||
if (!this.#options.defaults) return options;
|
||||
|
||||
return {
|
||||
...this.#options.defaults,
|
||||
...options,
|
||||
socket: this.#options.defaults.socket && options?.socket ? {
|
||||
...this.#options.defaults.socket,
|
||||
...options.socket
|
||||
} : this.#options.defaults.socket ?? options?.socket
|
||||
};
|
||||
}
|
||||
|
||||
#initiateClient(options?: RedisClusterClientOptions): RedisClientType<M, F, S> {
|
||||
return new this.#Client(this.#clientOptionsDefaults(options))
|
||||
.on('error', this.#onError);
|
||||
}
|
||||
|
||||
#getNodeAddress(address: string): NodeAddress | undefined {
|
||||
switch (typeof this.#options.nodeAddressMap) {
|
||||
case 'object':
|
||||
@@ -164,111 +261,123 @@ export default class RedisClusterSlots<
|
||||
}
|
||||
}
|
||||
|
||||
#initiateClientForNode(
|
||||
nodeData: RedisClusterMasterNode | RedisClusterReplicaNode,
|
||||
readonly: boolean,
|
||||
clientsInUse: Set<string>,
|
||||
promises: Array<Promise<void>>
|
||||
): ClusterNode<M, F, S> {
|
||||
const address = `${nodeData.host}:${nodeData.port}`;
|
||||
clientsInUse.add(address);
|
||||
#clientOptionsDefaults(
|
||||
options?: RedisClusterClientOptions,
|
||||
disableReconnect?: boolean
|
||||
): RedisClusterClientOptions | undefined {
|
||||
let result: RedisClusterClientOptions | undefined;
|
||||
if (this.#options.defaults) {
|
||||
let socket;
|
||||
if (this.#options.defaults.socket) {
|
||||
socket = options?.socket ? {
|
||||
...this.#options.defaults.socket,
|
||||
...options.socket
|
||||
} : this.#options.defaults.socket;
|
||||
} else {
|
||||
socket = options?.socket;
|
||||
}
|
||||
|
||||
let node = this.#nodeByAddress.get(address);
|
||||
result = {
|
||||
...this.#options.defaults,
|
||||
...options,
|
||||
socket
|
||||
};
|
||||
} else {
|
||||
result = options;
|
||||
}
|
||||
|
||||
if (disableReconnect) {
|
||||
result ??= {};
|
||||
result.socket ??= {};
|
||||
result.socket.reconnectStrategy = false;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
#initiateSlotNode(
|
||||
{ id, ip, port }: ClusterSlotsNode,
|
||||
readonly: boolean,
|
||||
eagerConnent: boolean,
|
||||
addressesInUse: Set<string>,
|
||||
promises: Array<Promise<unknown>>
|
||||
) {
|
||||
const address = `${ip}:${port}`;
|
||||
addressesInUse.add(address);
|
||||
|
||||
let node = this.nodeByAddress.get(address);
|
||||
if (!node) {
|
||||
node = {
|
||||
id: nodeData.id,
|
||||
client: this.#initiateClient({
|
||||
socket: this.#getNodeAddress(address) ?? {
|
||||
host: nodeData.host,
|
||||
port: nodeData.port
|
||||
},
|
||||
readonly
|
||||
})
|
||||
id,
|
||||
host: ip,
|
||||
port,
|
||||
address,
|
||||
readonly,
|
||||
client: undefined
|
||||
};
|
||||
promises.push(node.client.connect());
|
||||
this.#nodeByAddress.set(address, node);
|
||||
|
||||
if (eagerConnent) {
|
||||
promises.push(this.#createNodeClient(node));
|
||||
}
|
||||
|
||||
this.nodeByAddress.set(address, node);
|
||||
}
|
||||
|
||||
(readonly ? this.replicas : this.masters).push(node);
|
||||
|
||||
return node;
|
||||
}
|
||||
|
||||
getSlotMaster(slot: number): ClusterNode<M, F, S> {
|
||||
return this.#slots[slot].master;
|
||||
}
|
||||
|
||||
*#slotClientIterator(slotNumber: number): IterableIterator<RedisClientType<M, F, S>> {
|
||||
const slot = this.#slots[slotNumber];
|
||||
yield slot.master.client;
|
||||
|
||||
for (const replica of slot.replicas) {
|
||||
yield replica.client;
|
||||
}
|
||||
}
|
||||
|
||||
#getSlotClient(slotNumber: number): RedisClientType<M, F, S> {
|
||||
const slot = this.#slots[slotNumber];
|
||||
if (!slot.clientIterator) {
|
||||
slot.clientIterator = this.#slotClientIterator(slotNumber);
|
||||
}
|
||||
|
||||
const {done, value} = slot.clientIterator.next();
|
||||
if (done) {
|
||||
slot.clientIterator = undefined;
|
||||
return this.#getSlotClient(slotNumber);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
#randomClientIterator?: IterableIterator<ClusterNode<M, F, S>>;
|
||||
|
||||
#getRandomClient(): RedisClientType<M, F, S> {
|
||||
if (!this.#nodeByAddress.size) {
|
||||
throw new Error('Cluster is not connected');
|
||||
}
|
||||
|
||||
if (!this.#randomClientIterator) {
|
||||
this.#randomClientIterator = this.#nodeByAddress.values();
|
||||
}
|
||||
|
||||
const {done, value} = this.#randomClientIterator.next();
|
||||
if (done) {
|
||||
this.#randomClientIterator = undefined;
|
||||
return this.#getRandomClient();
|
||||
}
|
||||
|
||||
return value.client;
|
||||
}
|
||||
|
||||
getClient(firstKey?: RedisCommandArgument, isReadonly?: boolean): RedisClientType<M, F, S> {
|
||||
if (!firstKey) {
|
||||
return this.#getRandomClient();
|
||||
}
|
||||
|
||||
const slot = calculateSlot(firstKey);
|
||||
if (!isReadonly || !this.#options.useReplicas) {
|
||||
return this.getSlotMaster(slot).client;
|
||||
}
|
||||
|
||||
return this.#getSlotClient(slot);
|
||||
}
|
||||
|
||||
getMasters(): Array<ClusterNode<M, F, S>> {
|
||||
const masters = [];
|
||||
for (const node of this.#nodeByAddress.values()) {
|
||||
if (node.client.options?.readonly) continue;
|
||||
|
||||
masters.push(node);
|
||||
}
|
||||
|
||||
return masters;
|
||||
}
|
||||
|
||||
getNodeByAddress(address: string): ClusterNode<M, F, S> | undefined {
|
||||
const mappedAddress = this.#getNodeAddress(address);
|
||||
return this.#nodeByAddress.get(
|
||||
mappedAddress ? `${mappedAddress.host}:${mappedAddress.port}` : address
|
||||
async #createClient(
|
||||
node: ShardNode<M, F, S>,
|
||||
readonly = node.readonly
|
||||
) {
|
||||
const client = new this.#Client(
|
||||
this.#clientOptionsDefaults({
|
||||
socket: this.#getNodeAddress(node.address) ?? {
|
||||
host: node.host,
|
||||
port: node.port
|
||||
},
|
||||
readonly
|
||||
})
|
||||
);
|
||||
client.on('error', err => this.#emit('error', err));
|
||||
|
||||
await client.connect();
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
#createNodeClient(node: ShardNode<M, F, S>) {
|
||||
const promise = this.#createClient(node)
|
||||
.then(client => {
|
||||
node.client = client;
|
||||
return client;
|
||||
})
|
||||
.catch(err => {
|
||||
node.client = undefined;
|
||||
throw err;
|
||||
});
|
||||
node.client = promise;
|
||||
return promise;
|
||||
}
|
||||
|
||||
nodeClient(node: ShardNode<M, F, S>) {
|
||||
return node.client ?? this.#createNodeClient(node);
|
||||
}
|
||||
|
||||
#runningRediscoverPromise?: Promise<void>;
|
||||
|
||||
async rediscover(startWith: RedisClientType<M, F, S>): Promise<void> {
|
||||
this.#runningRediscoverPromise ??= this.#rediscover(startWith)
|
||||
.finally(() => this.#runningRediscoverPromise = undefined);
|
||||
return this.#runningRediscoverPromise;
|
||||
}
|
||||
|
||||
async #rediscover(startWith: RedisClientType<M, F, S>): Promise<void> {
|
||||
if (await this.#discover(startWith.options)) return;
|
||||
|
||||
return this.#discoverWithRootNodes();
|
||||
}
|
||||
|
||||
quit(): Promise<void> {
|
||||
@@ -280,14 +389,233 @@ export default class RedisClusterSlots<
|
||||
}
|
||||
|
||||
async #destroy(fn: (client: RedisClientType<M, F, S>) => Promise<unknown>): Promise<void> {
|
||||
this.#isOpen = false;
|
||||
|
||||
const promises = [];
|
||||
for (const { client } of this.#nodeByAddress.values()) {
|
||||
promises.push(fn(client));
|
||||
for (const { master, replicas } of this.shards) {
|
||||
if (master.client) {
|
||||
promises.push(
|
||||
this.#execOnNodeClient(master.client, fn)
|
||||
);
|
||||
}
|
||||
|
||||
if (master.pubSubClient) {
|
||||
promises.push(
|
||||
this.#execOnNodeClient(master.pubSubClient, fn)
|
||||
);
|
||||
}
|
||||
|
||||
if (replicas) {
|
||||
for (const { client } of replicas) {
|
||||
if (client) {
|
||||
promises.push(
|
||||
this.#execOnNodeClient(client, fn)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(promises);
|
||||
if (this.pubSubNode) {
|
||||
promises.push(this.#execOnNodeClient(this.pubSubNode.client, fn));
|
||||
this.pubSubNode = undefined;
|
||||
}
|
||||
|
||||
this.#nodeByAddress.clear();
|
||||
this.#slots.splice(0);
|
||||
this.#resetSlots();
|
||||
this.nodeByAddress.clear();
|
||||
|
||||
await Promise.allSettled(promises);
|
||||
}
|
||||
|
||||
#execOnNodeClient(
|
||||
client: ClientOrPromise<M, F, S>,
|
||||
fn: (client: RedisClientType<M, F, S>) => Promise<unknown>
|
||||
) {
|
||||
return types.isPromise(client) ?
|
||||
client.then(fn) :
|
||||
fn(client);
|
||||
}
|
||||
|
||||
getClient(
|
||||
firstKey: RedisCommandArgument | undefined,
|
||||
isReadonly: boolean | undefined
|
||||
): ClientOrPromise<M, F, S> {
|
||||
if (!firstKey) {
|
||||
return this.nodeClient(this.getRandomNode());
|
||||
}
|
||||
|
||||
const slotNumber = calculateSlot(firstKey);
|
||||
if (!isReadonly) {
|
||||
return this.nodeClient(this.slots[slotNumber].master);
|
||||
}
|
||||
|
||||
return this.nodeClient(this.getSlotRandomNode(slotNumber));
|
||||
}
|
||||
|
||||
*#iterateAllNodes() {
|
||||
let i = Math.floor(Math.random() * (this.masters.length + this.replicas.length));
|
||||
if (i < this.masters.length) {
|
||||
do {
|
||||
yield this.masters[i];
|
||||
} while (++i < this.masters.length);
|
||||
|
||||
for (const replica of this.replicas) {
|
||||
yield replica;
|
||||
}
|
||||
} else {
|
||||
i -= this.masters.length;
|
||||
do {
|
||||
yield this.replicas[i];
|
||||
} while (++i < this.replicas.length);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
for (const master of this.masters) {
|
||||
yield master;
|
||||
}
|
||||
|
||||
for (const replica of this.replicas) {
|
||||
yield replica;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#randomNodeIterator?: IterableIterator<ShardNode<M, F, S>>;
|
||||
|
||||
getRandomNode() {
|
||||
this.#randomNodeIterator ??= this.#iterateAllNodes();
|
||||
return this.#randomNodeIterator.next().value as ShardNode<M, F, S>;
|
||||
}
|
||||
|
||||
*#slotNodesIterator(slot: ShardWithReplicas<M, F, S>) {
|
||||
let i = Math.floor(Math.random() * (1 + slot.replicas.length));
|
||||
if (i < slot.replicas.length) {
|
||||
do {
|
||||
yield slot.replicas[i];
|
||||
} while (++i < slot.replicas.length);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
yield slot.master;
|
||||
|
||||
for (const replica of slot.replicas) {
|
||||
yield replica;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
getSlotRandomNode(slotNumber: number) {
|
||||
const slot = this.slots[slotNumber];
|
||||
if (!slot.replicas?.length) {
|
||||
return slot.master;
|
||||
}
|
||||
|
||||
slot.nodesIterator ??= this.#slotNodesIterator(slot as ShardWithReplicas<M, F, S>);
|
||||
return slot.nodesIterator.next().value as ShardNode<M, F, S>;
|
||||
}
|
||||
|
||||
getMasterByAddress(address: string) {
|
||||
const master = this.nodeByAddress.get(address);
|
||||
if (!master) return;
|
||||
|
||||
return this.nodeClient(master);
|
||||
}
|
||||
|
||||
getPubSubClient() {
|
||||
return this.pubSubNode ?
|
||||
this.pubSubNode.client :
|
||||
this.#initiatePubSubClient();
|
||||
}
|
||||
|
||||
async #initiatePubSubClient(toResubscribe?: PubSubToResubscribe) {
|
||||
const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)),
|
||||
node = index < this.masters.length ?
|
||||
this.masters[index] :
|
||||
this.replicas[index - this.masters.length];
|
||||
|
||||
this.pubSubNode = {
|
||||
address: node.address,
|
||||
client: this.#createClient(node, true)
|
||||
.then(async client => {
|
||||
if (toResubscribe) {
|
||||
await Promise.all([
|
||||
client.extendPubSubListeners(PubSubType.CHANNELS, toResubscribe[PubSubType.CHANNELS]),
|
||||
client.extendPubSubListeners(PubSubType.PATTERNS, toResubscribe[PubSubType.PATTERNS])
|
||||
]);
|
||||
}
|
||||
|
||||
this.pubSubNode!.client = client;
|
||||
return client;
|
||||
})
|
||||
.catch(err => {
|
||||
this.pubSubNode = undefined;
|
||||
throw err;
|
||||
})
|
||||
};
|
||||
|
||||
return this.pubSubNode.client as Promise<RedisClientType<M, F, S>>;
|
||||
}
|
||||
|
||||
async executeUnsubscribeCommand(
|
||||
unsubscribe: (client: RedisClientType<M, F, S>) => Promise<void>
|
||||
): Promise<void> {
|
||||
const client = await this.getPubSubClient();
|
||||
await unsubscribe(client);
|
||||
|
||||
if (!client.isPubSubActive) {
|
||||
await client.disconnect();
|
||||
this.pubSubNode = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
getShardedPubSubClient(channel: string) {
|
||||
const { master } = this.slots[calculateSlot(channel)];
|
||||
return master.pubSubClient ?? this.#initiateShardedPubSubClient(master);
|
||||
}
|
||||
|
||||
#initiateShardedPubSubClient(master: MasterNode<M, F, S>) {
|
||||
const promise = this.#createClient(master, true)
|
||||
.then(client => {
|
||||
client.on('server-sunsubscribe', async (channel, listeners) => {
|
||||
try {
|
||||
await this.rediscover(client);
|
||||
const redirectTo = await this.getShardedPubSubClient(channel);
|
||||
redirectTo.extendPubSubChannelListeners(
|
||||
PubSubType.SHARDED,
|
||||
channel,
|
||||
listeners
|
||||
);
|
||||
} catch (err) {
|
||||
this.#emit('sharded-shannel-moved-error', err, channel, listeners);
|
||||
}
|
||||
});
|
||||
|
||||
master.pubSubClient = client;
|
||||
return client;
|
||||
})
|
||||
.catch(err => {
|
||||
master.pubSubClient = undefined;
|
||||
throw err;
|
||||
});
|
||||
|
||||
master.pubSubClient = promise;
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
async executeShardedUnsubscribeCommand(
|
||||
channel: string,
|
||||
unsubscribe: (client: RedisClientType<M, F, S>) => Promise<void>
|
||||
): Promise<void> {
|
||||
const { master } = this.slots[calculateSlot(channel)];
|
||||
if (!master.pubSubClient) return Promise.resolve();
|
||||
|
||||
const client = await master.pubSubClient;
|
||||
await unsubscribe(client);
|
||||
|
||||
if (!client.isPubSubActive) {
|
||||
await client.disconnect();
|
||||
master.pubSubClient = undefined;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -135,6 +135,7 @@ import * as SORT_RO from '../commands/SORT_RO';
|
||||
import * as SORT_STORE from '../commands/SORT_STORE';
|
||||
import * as SORT from '../commands/SORT';
|
||||
import * as SPOP from '../commands/SPOP';
|
||||
import * as SPUBLISH from '../commands/SPUBLISH';
|
||||
import * as SRANDMEMBER_COUNT from '../commands/SRANDMEMBER_COUNT';
|
||||
import * as SRANDMEMBER from '../commands/SRANDMEMBER';
|
||||
import * as SREM from '../commands/SREM';
|
||||
@@ -483,6 +484,8 @@ export default {
|
||||
sort: SORT,
|
||||
SPOP,
|
||||
sPop: SPOP,
|
||||
SPUBLISH,
|
||||
sPublish: SPUBLISH,
|
||||
SRANDMEMBER_COUNT,
|
||||
sRandMemberCount: SRANDMEMBER_COUNT,
|
||||
SRANDMEMBER,
|
||||
|
@@ -1,25 +1,29 @@
|
||||
import { strict as assert } from 'assert';
|
||||
import testUtils, { GLOBAL } from '../test-utils';
|
||||
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
|
||||
import RedisCluster from '.';
|
||||
import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT';
|
||||
import { SQUARE_SCRIPT } from '../client/index.spec';
|
||||
import { RootNodesUnavailableError } from '../errors';
|
||||
|
||||
// We need to use 'require', because it's not possible with Typescript to import
|
||||
// function that are exported as 'module.exports = function`, without esModuleInterop
|
||||
// set to true.
|
||||
const calculateSlot = require('cluster-key-slot');
|
||||
import { spy } from 'sinon';
|
||||
import { promiseTimeout } from '../utils';
|
||||
import RedisClient from '../client';
|
||||
|
||||
describe('Cluster', () => {
|
||||
testUtils.testWithCluster('sendCommand', async cluster => {
|
||||
await cluster.publish('channel', 'message');
|
||||
await cluster.set('a', 'b');
|
||||
await cluster.set('a{a}', 'bb');
|
||||
await cluster.set('aa', 'bb');
|
||||
await cluster.get('aa');
|
||||
await cluster.get('aa');
|
||||
await cluster.get('aa');
|
||||
await cluster.get('aa');
|
||||
assert.equal(
|
||||
await cluster.sendCommand(undefined, true, ['PING']),
|
||||
'PONG'
|
||||
);
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
testUtils.testWithCluster('isOpen', async cluster => {
|
||||
assert.equal(cluster.isOpen, true);
|
||||
await cluster.disconnect();
|
||||
assert.equal(cluster.isOpen, false);
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
testUtils.testWithCluster('connect should throw if already connected', async cluster => {
|
||||
await assert.rejects(cluster.connect());
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
testUtils.testWithCluster('multi', async cluster => {
|
||||
@@ -64,54 +68,279 @@ describe('Cluster', () => {
|
||||
});
|
||||
|
||||
testUtils.testWithCluster('should handle live resharding', async cluster => {
|
||||
const key = 'key',
|
||||
const slot = 12539,
|
||||
key = 'key',
|
||||
value = 'value';
|
||||
await cluster.set(key, value);
|
||||
|
||||
const slot = calculateSlot(key),
|
||||
source = cluster.getSlotMaster(slot),
|
||||
destination = cluster.getMasters().find(node => node.id !== source.id)!;
|
||||
const importing = cluster.slots[0].master,
|
||||
migrating = cluster.slots[slot].master,
|
||||
[ importingClient, migratingClient ] = await Promise.all([
|
||||
cluster.nodeClient(importing),
|
||||
cluster.nodeClient(migrating)
|
||||
]);
|
||||
|
||||
await Promise.all([
|
||||
source.client.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, destination.id),
|
||||
destination.client.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, destination.id)
|
||||
importingClient.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, migrating.id),
|
||||
migratingClient.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, importing.id)
|
||||
]);
|
||||
|
||||
// should be able to get the key from the source node using "ASKING"
|
||||
// should be able to get the key from the migrating node
|
||||
assert.equal(
|
||||
await cluster.get(key),
|
||||
value
|
||||
);
|
||||
|
||||
await migratingClient.migrate(
|
||||
importing.host,
|
||||
importing.port,
|
||||
key,
|
||||
0,
|
||||
10
|
||||
);
|
||||
|
||||
// should be able to get the key from the importing node using `ASKING`
|
||||
assert.equal(
|
||||
await cluster.get(key),
|
||||
value
|
||||
);
|
||||
|
||||
await Promise.all([
|
||||
source.client.migrate(
|
||||
'127.0.0.1',
|
||||
(<any>destination.client.options).socket.port,
|
||||
key,
|
||||
0,
|
||||
10
|
||||
)
|
||||
importingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id),
|
||||
migratingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id),
|
||||
]);
|
||||
|
||||
// should be able to get the key from the destination node using the "ASKING" command
|
||||
assert.equal(
|
||||
await cluster.get(key),
|
||||
value
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
cluster.getMasters().map(({ client }) => {
|
||||
return client.clusterSetSlot(slot, ClusterSlotStates.NODE, destination.id);
|
||||
})
|
||||
);
|
||||
|
||||
// should handle "MOVED" errors
|
||||
// should handle `MOVED` errors
|
||||
assert.equal(
|
||||
await cluster.get(key),
|
||||
value
|
||||
);
|
||||
}, {
|
||||
serverArguments: [],
|
||||
numberOfNodes: 2
|
||||
numberOfMasters: 2
|
||||
});
|
||||
|
||||
testUtils.testWithCluster('getRandomNode should spread the the load evenly', async cluster => {
|
||||
const totalNodes = cluster.masters.length + cluster.replicas.length,
|
||||
ids = new Set<string>();
|
||||
for (let i = 0; i < totalNodes; i++) {
|
||||
ids.add(cluster.getRandomNode().id);
|
||||
}
|
||||
|
||||
assert.equal(ids.size, totalNodes);
|
||||
}, GLOBAL.CLUSTERS.WITH_REPLICAS);
|
||||
|
||||
testUtils.testWithCluster('getSlotRandomNode should spread the the load evenly', async cluster => {
|
||||
const totalNodes = 1 + cluster.slots[0].replicas!.length,
|
||||
ids = new Set<string>();
|
||||
for (let i = 0; i < totalNodes; i++) {
|
||||
ids.add(cluster.getSlotRandomNode(0).id);
|
||||
}
|
||||
|
||||
assert.equal(ids.size, totalNodes);
|
||||
}, GLOBAL.CLUSTERS.WITH_REPLICAS);
|
||||
|
||||
testUtils.testWithCluster('cluster topology', async cluster => {
|
||||
assert.equal(cluster.slots.length, 16384);
|
||||
const { numberOfMasters, numberOfReplicas } = GLOBAL.CLUSTERS.WITH_REPLICAS;
|
||||
assert.equal(cluster.shards.length, numberOfMasters);
|
||||
assert.equal(cluster.masters.length, numberOfMasters);
|
||||
assert.equal(cluster.replicas.length, numberOfReplicas * numberOfMasters);
|
||||
assert.equal(cluster.nodeByAddress.size, numberOfMasters + numberOfMasters * numberOfReplicas);
|
||||
}, GLOBAL.CLUSTERS.WITH_REPLICAS);
|
||||
|
||||
testUtils.testWithCluster('getMasters should be backwards competiable (without `minimizeConnections`)', async cluster => {
|
||||
const masters = cluster.getMasters();
|
||||
assert.ok(Array.isArray(masters));
|
||||
for (const master of masters) {
|
||||
assert.equal(typeof master.id, 'string');
|
||||
assert.ok(master.client instanceof RedisClient);
|
||||
}
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
clusterConfiguration: {
|
||||
minimizeConnections: undefined // reset to default
|
||||
}
|
||||
});
|
||||
|
||||
testUtils.testWithCluster('getSlotMaster should be backwards competiable (without `minimizeConnections`)', async cluster => {
|
||||
const master = cluster.getSlotMaster(0);
|
||||
assert.equal(typeof master.id, 'string');
|
||||
assert.ok(master.client instanceof RedisClient);
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
clusterConfiguration: {
|
||||
minimizeConnections: undefined // reset to default
|
||||
}
|
||||
});
|
||||
|
||||
testUtils.testWithCluster('should throw CROSSSLOT error', async cluster => {
|
||||
await assert.rejects(cluster.mGet(['a', 'b']));
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
describe('minimizeConnections', () => {
|
||||
testUtils.testWithCluster('false', async cluster => {
|
||||
for (const master of cluster.masters) {
|
||||
assert.ok(master.client instanceof RedisClient);
|
||||
}
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
clusterConfiguration: {
|
||||
minimizeConnections: false
|
||||
}
|
||||
});
|
||||
|
||||
testUtils.testWithCluster('true', async cluster => {
|
||||
for (const master of cluster.masters) {
|
||||
assert.equal(master.client, undefined);
|
||||
}
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
clusterConfiguration: {
|
||||
minimizeConnections: true
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('PubSub', () => {
|
||||
testUtils.testWithCluster('subscribe & unsubscribe', async cluster => {
|
||||
const listener = spy();
|
||||
|
||||
await cluster.subscribe('channel', listener);
|
||||
|
||||
await Promise.all([
|
||||
waitTillBeenCalled(listener),
|
||||
cluster.publish('channel', 'message')
|
||||
]);
|
||||
|
||||
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
|
||||
await cluster.unsubscribe('channel', listener);
|
||||
|
||||
assert.equal(cluster.pubSubNode, undefined);
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => {
|
||||
const listener = spy();
|
||||
|
||||
await cluster.pSubscribe('channe*', listener);
|
||||
|
||||
await Promise.all([
|
||||
waitTillBeenCalled(listener),
|
||||
cluster.publish('channel', 'message')
|
||||
]);
|
||||
|
||||
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
|
||||
await cluster.pUnsubscribe('channe*', listener);
|
||||
|
||||
assert.equal(cluster.pubSubNode, undefined);
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
testUtils.testWithCluster('should move listeners when PubSub node disconnects from the cluster', async cluster => {
|
||||
const listener = spy();
|
||||
await cluster.subscribe('channel', listener);
|
||||
|
||||
assert.ok(cluster.pubSubNode);
|
||||
const [ migrating, importing ] = cluster.masters[0].address === cluster.pubSubNode.address ?
|
||||
cluster.masters :
|
||||
[cluster.masters[1], cluster.masters[0]],
|
||||
[ migratingClient, importingClient ] = await Promise.all([
|
||||
cluster.nodeClient(migrating),
|
||||
cluster.nodeClient(importing)
|
||||
]);
|
||||
|
||||
const range = cluster.slots[0].master === migrating ? {
|
||||
key: 'bar', // 5061
|
||||
start: 0,
|
||||
end: 8191
|
||||
} : {
|
||||
key: 'foo', // 12182
|
||||
start: 8192,
|
||||
end: 16383
|
||||
};
|
||||
|
||||
await Promise.all([
|
||||
migratingClient.clusterDelSlotsRange(range),
|
||||
importingClient.clusterDelSlotsRange(range),
|
||||
importingClient.clusterAddSlotsRange(range)
|
||||
]);
|
||||
|
||||
// wait for migrating node to be notified about the new topology
|
||||
while ((await migratingClient.clusterInfo()).state !== 'ok') {
|
||||
await promiseTimeout(50);
|
||||
}
|
||||
|
||||
// make sure to cause `MOVED` error
|
||||
await cluster.get(range.key);
|
||||
|
||||
await Promise.all([
|
||||
cluster.publish('channel', 'message'),
|
||||
waitTillBeenCalled(listener)
|
||||
]);
|
||||
|
||||
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
}, {
|
||||
serverArguments: [],
|
||||
numberOfMasters: 2,
|
||||
minimumDockerVersion: [7]
|
||||
});
|
||||
|
||||
testUtils.testWithCluster('ssubscribe & sunsubscribe', async cluster => {
|
||||
const listener = spy();
|
||||
|
||||
await cluster.sSubscribe('channel', listener);
|
||||
|
||||
await Promise.all([
|
||||
waitTillBeenCalled(listener),
|
||||
cluster.sPublish('channel', 'message')
|
||||
]);
|
||||
|
||||
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
|
||||
await cluster.sUnsubscribe('channel', listener);
|
||||
|
||||
// 10328 is the slot of `channel`
|
||||
assert.equal(cluster.slots[10328].master.pubSubClient, undefined);
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
minimumDockerVersion: [7]
|
||||
});
|
||||
|
||||
testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => {
|
||||
const SLOT = 10328,
|
||||
migrating = cluster.slots[SLOT].master,
|
||||
importing = cluster.masters.find(master => master !== migrating)!,
|
||||
[ migratingClient, importingClient ] = await Promise.all([
|
||||
cluster.nodeClient(migrating),
|
||||
cluster.nodeClient(importing)
|
||||
]);
|
||||
|
||||
await Promise.all([
|
||||
migratingClient.clusterDelSlots(SLOT),
|
||||
importingClient.clusterDelSlots(SLOT),
|
||||
importingClient.clusterAddSlots(SLOT)
|
||||
]);
|
||||
|
||||
// wait for migrating node to be notified about the new topology
|
||||
while ((await migratingClient.clusterInfo()).state !== 'ok') {
|
||||
await promiseTimeout(50);
|
||||
}
|
||||
|
||||
const listener = spy();
|
||||
|
||||
// will trigger `MOVED` error
|
||||
await cluster.sSubscribe('channel', listener);
|
||||
|
||||
await Promise.all([
|
||||
waitTillBeenCalled(listener),
|
||||
cluster.sPublish('channel', 'message')
|
||||
]);
|
||||
|
||||
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
}, {
|
||||
serverArguments: [],
|
||||
minimumDockerVersion: [7]
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@@ -1,11 +1,13 @@
|
||||
import COMMANDS from './commands';
|
||||
import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisFunctions, RedisModules, RedisExtensions, RedisScript, RedisScripts, RedisCommandSignature, RedisFunction } from '../commands';
|
||||
import { ClientCommandOptions, RedisClientOptions, RedisClientType, WithFunctions, WithModules, WithScripts } from '../client';
|
||||
import RedisClusterSlots, { ClusterNode, NodeAddressMap } from './cluster-slots';
|
||||
import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots';
|
||||
import { attachExtensions, transformCommandReply, attachCommands, transformCommandArguments } from '../commander';
|
||||
import { EventEmitter } from 'events';
|
||||
import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command';
|
||||
import { RedisMultiQueuedCommand } from '../multi-command';
|
||||
import { PubSubListener } from '../client/pub-sub';
|
||||
import { ErrorReply } from '../errors';
|
||||
|
||||
export type RedisClusterClientOptions = Omit<
|
||||
RedisClientOptions,
|
||||
@@ -17,10 +19,34 @@ export interface RedisClusterOptions<
|
||||
F extends RedisFunctions = Record<string, never>,
|
||||
S extends RedisScripts = Record<string, never>
|
||||
> extends RedisExtensions<M, F, S> {
|
||||
/**
|
||||
* Should contain details for some of the cluster nodes that the client will use to discover
|
||||
* the "cluster topology". We recommend including details for at least 3 nodes here.
|
||||
*/
|
||||
rootNodes: Array<RedisClusterClientOptions>;
|
||||
/**
|
||||
* Default values used for every client in the cluster. Use this to specify global values,
|
||||
* for example: ACL credentials, timeouts, TLS configuration etc.
|
||||
*/
|
||||
defaults?: Partial<RedisClusterClientOptions>;
|
||||
/**
|
||||
* When `true`, `.connect()` will only discover the cluster topology, without actually connecting to all the nodes.
|
||||
* Useful for short-term or PubSub-only connections.
|
||||
*/
|
||||
minimizeConnections?: boolean;
|
||||
/**
|
||||
* When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes.
|
||||
*/
|
||||
useReplicas?: boolean;
|
||||
/**
|
||||
* The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors.
|
||||
*/
|
||||
maxCommandRedirections?: number;
|
||||
/**
|
||||
* Mapping between the addresses in the cluster (see `CLUSTER SHARDS`) and the addresses the client should connect to
|
||||
* Useful when the cluster is running on another network
|
||||
*
|
||||
*/
|
||||
nodeAddressMap?: NodeAddressMap;
|
||||
}
|
||||
|
||||
@@ -70,14 +96,44 @@ export default class RedisCluster<
|
||||
}
|
||||
|
||||
readonly #options: RedisClusterOptions<M, F, S>;
|
||||
|
||||
readonly #slots: RedisClusterSlots<M, F, S>;
|
||||
|
||||
get slots() {
|
||||
return this.#slots.slots;
|
||||
}
|
||||
|
||||
get shards() {
|
||||
return this.#slots.shards;
|
||||
}
|
||||
|
||||
get masters() {
|
||||
return this.#slots.masters;
|
||||
}
|
||||
|
||||
get replicas() {
|
||||
return this.#slots.replicas;
|
||||
}
|
||||
|
||||
get nodeByAddress() {
|
||||
return this.#slots.nodeByAddress;
|
||||
}
|
||||
|
||||
get pubSubNode() {
|
||||
return this.#slots.pubSubNode;
|
||||
}
|
||||
|
||||
readonly #Multi: InstantiableRedisClusterMultiCommandType<M, F, S>;
|
||||
|
||||
get isOpen() {
|
||||
return this.#slots.isOpen;
|
||||
}
|
||||
|
||||
constructor(options: RedisClusterOptions<M, F, S>) {
|
||||
super();
|
||||
|
||||
this.#options = options;
|
||||
this.#slots = new RedisClusterSlots(options, err => this.emit('error', err));
|
||||
this.#slots = new RedisClusterSlots(options, this.emit.bind(this));
|
||||
this.#Multi = RedisClusterMultiCommand.extend(options);
|
||||
}
|
||||
|
||||
@@ -88,7 +144,7 @@ export default class RedisCluster<
|
||||
});
|
||||
}
|
||||
|
||||
async connect(): Promise<void> {
|
||||
connect() {
|
||||
return this.#slots.connect();
|
||||
}
|
||||
|
||||
@@ -188,34 +244,33 @@ export default class RedisCluster<
|
||||
executor: (client: RedisClientType<M, F, S>) => Promise<Reply>
|
||||
): Promise<Reply> {
|
||||
const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16;
|
||||
let client = this.#slots.getClient(firstKey, isReadonly);
|
||||
let client = await this.#slots.getClient(firstKey, isReadonly);
|
||||
for (let i = 0;; i++) {
|
||||
try {
|
||||
return await executor(client);
|
||||
} catch (err) {
|
||||
if (++i > maxCommandRedirections || !(err instanceof Error)) {
|
||||
if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
if (err.message.startsWith('ASK')) {
|
||||
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
|
||||
if (this.#slots.getNodeByAddress(address)?.client === client) {
|
||||
await client.asking();
|
||||
continue;
|
||||
let redirectTo = await this.#slots.getMasterByAddress(address);
|
||||
if (!redirectTo) {
|
||||
await this.#slots.rediscover(client);
|
||||
redirectTo = await this.#slots.getMasterByAddress(address);
|
||||
}
|
||||
|
||||
await this.#slots.rediscover(client);
|
||||
const redirectTo = this.#slots.getNodeByAddress(address);
|
||||
if (!redirectTo) {
|
||||
throw new Error(`Cannot find node ${address}`);
|
||||
}
|
||||
|
||||
await redirectTo.client.asking();
|
||||
client = redirectTo.client;
|
||||
await redirectTo.asking();
|
||||
client = redirectTo;
|
||||
continue;
|
||||
} else if (err.message.startsWith('MOVED')) {
|
||||
await this.#slots.rediscover(client);
|
||||
client = this.#slots.getClient(firstKey, isReadonly);
|
||||
client = await this.#slots.getClient(firstKey, isReadonly);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -239,14 +294,94 @@ export default class RedisCluster<
|
||||
|
||||
multi = this.MULTI;
|
||||
|
||||
getMasters(): Array<ClusterNode<M, F, S>> {
|
||||
return this.#slots.getMasters();
|
||||
async SUBSCRIBE<T extends boolean = false>(
|
||||
channels: string | Array<string>,
|
||||
listener: PubSubListener<T>,
|
||||
bufferMode?: T
|
||||
) {
|
||||
return (await this.#slots.getPubSubClient())
|
||||
.SUBSCRIBE(channels, listener, bufferMode);
|
||||
}
|
||||
|
||||
getSlotMaster(slot: number): ClusterNode<M, F, S> {
|
||||
return this.#slots.getSlotMaster(slot);
|
||||
subscribe = this.SUBSCRIBE;
|
||||
|
||||
async UNSUBSCRIBE<T extends boolean = false>(
|
||||
channels?: string | Array<string>,
|
||||
listener?: PubSubListener<boolean>,
|
||||
bufferMode?: T
|
||||
) {
|
||||
return this.#slots.executeUnsubscribeCommand(client =>
|
||||
client.UNSUBSCRIBE(channels, listener, bufferMode)
|
||||
);
|
||||
}
|
||||
|
||||
unsubscribe = this.UNSUBSCRIBE;
|
||||
|
||||
async PSUBSCRIBE<T extends boolean = false>(
|
||||
patterns: string | Array<string>,
|
||||
listener: PubSubListener<T>,
|
||||
bufferMode?: T
|
||||
) {
|
||||
return (await this.#slots.getPubSubClient())
|
||||
.PSUBSCRIBE(patterns, listener, bufferMode);
|
||||
}
|
||||
|
||||
pSubscribe = this.PSUBSCRIBE;
|
||||
|
||||
async PUNSUBSCRIBE<T extends boolean = false>(
|
||||
patterns?: string | Array<string>,
|
||||
listener?: PubSubListener<T>,
|
||||
bufferMode?: T
|
||||
) {
|
||||
return this.#slots.executeUnsubscribeCommand(client =>
|
||||
client.PUNSUBSCRIBE(patterns, listener, bufferMode)
|
||||
);
|
||||
}
|
||||
|
||||
pUnsubscribe = this.PUNSUBSCRIBE;
|
||||
|
||||
async SSUBSCRIBE<T extends boolean = false>(
|
||||
channels: string | Array<string>,
|
||||
listener: PubSubListener<T>,
|
||||
bufferMode?: T
|
||||
) {
|
||||
const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16,
|
||||
firstChannel = Array.isArray(channels) ? channels[0] : channels;
|
||||
let client = await this.#slots.getShardedPubSubClient(firstChannel);
|
||||
for (let i = 0;; i++) {
|
||||
try {
|
||||
return await client.SSUBSCRIBE(channels, listener, bufferMode);
|
||||
} catch (err) {
|
||||
if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
if (err.message.startsWith('MOVED')) {
|
||||
await this.#slots.rediscover(client);
|
||||
client = await this.#slots.getShardedPubSubClient(firstChannel);
|
||||
continue;
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sSubscribe = this.SSUBSCRIBE;
|
||||
|
||||
SUNSUBSCRIBE<T extends boolean = false>(
|
||||
channels: string | Array<string>,
|
||||
listener: PubSubListener<T>,
|
||||
bufferMode?: T
|
||||
) {
|
||||
return this.#slots.executeShardedUnsubscribeCommand(
|
||||
Array.isArray(channels) ? channels[0] : channels,
|
||||
client => client.SUNSUBSCRIBE(channels, listener, bufferMode)
|
||||
);
|
||||
}
|
||||
|
||||
sUnsubscribe = this.SUNSUBSCRIBE;
|
||||
|
||||
quit(): Promise<void> {
|
||||
return this.#slots.quit();
|
||||
}
|
||||
@@ -254,6 +389,32 @@ export default class RedisCluster<
|
||||
disconnect(): Promise<void> {
|
||||
return this.#slots.disconnect();
|
||||
}
|
||||
|
||||
nodeClient(node: ShardNode<M, F, S>) {
|
||||
return this.#slots.nodeClient(node);
|
||||
}
|
||||
|
||||
getRandomNode() {
|
||||
return this.#slots.getRandomNode();
|
||||
}
|
||||
|
||||
getSlotRandomNode(slot: number) {
|
||||
return this.#slots.getSlotRandomNode(slot);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use `.masters` instead
|
||||
*/
|
||||
getMasters() {
|
||||
return this.masters;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use `.slots[<SLOT>]` instead
|
||||
*/
|
||||
getSlotMaster(slot: number) {
|
||||
return this.slots[slot].master;
|
||||
}
|
||||
}
|
||||
|
||||
attachCommands({
|
||||
|
Reference in New Issue
Block a user