1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Update doctest client with latest v4 release (#2844)

This commit is contained in:
Shaya Potter
2024-09-29 13:19:06 +03:00
committed by GitHub
parent fd7b10be6c
commit 49fdb79897
167 changed files with 8227 additions and 9599 deletions

View File

@@ -1,23 +1,17 @@
import RedisClient, { InstantiableRedisClient, RedisClientType } from '../client';
import { RedisClusterMasterNode, RedisClusterReplicaNode } from '../commands/CLUSTER_NODES';
import { RedisClusterClientOptions, RedisClusterOptions } from '.';
import { RedisCommandArgument, RedisFunctions, RedisModules, RedisScripts } from '../commands';
import { RootNodesUnavailableError } from '../errors';
import { ClusterSlotsNode } from '../commands/CLUSTER_SLOTS';
import { types } from 'util';
import { ChannelListeners, PubSubType, PubSubTypeListeners } from '../client/pub-sub';
import { EventEmitter } from 'stream';
// We need to use 'require', because it's not possible with Typescript to import
// function that are exported as 'module.exports = function`, without esModuleInterop
// set to true.
const calculateSlot = require('cluster-key-slot');
export interface ClusterNode<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> {
id: string;
client: RedisClientType<M, F, S>;
}
interface NodeAddress {
host: string;
port: number;
@@ -27,133 +21,236 @@ export type NodeAddressMap = {
[address: string]: NodeAddress;
} | ((address: string) => NodeAddress | undefined);
interface SlotNodes<
type ValueOrPromise<T> = T | Promise<T>;
type ClientOrPromise<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> = ValueOrPromise<RedisClientType<M, F, S>>;
export interface Node<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> {
master: ClusterNode<M, F, S>;
replicas: Array<ClusterNode<M, F, S>>;
clientIterator: IterableIterator<RedisClientType<M, F, S>> | undefined;
address: string;
client?: ClientOrPromise<M, F, S>;
}
type OnError = (err: unknown) => void;
export interface ShardNode<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> extends Node<M, F, S> {
id: string;
host: string;
port: number;
readonly: boolean;
}
export interface MasterNode<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> extends ShardNode<M, F, S> {
pubSubClient?: ClientOrPromise<M, F, S>;
}
export interface Shard<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> {
master: MasterNode<M, F, S>;
replicas?: Array<ShardNode<M, F, S>>;
nodesIterator?: IterableIterator<ShardNode<M, F, S>>;
}
type ShardWithReplicas<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> = Shard<M, F, S> & Required<Pick<Shard<M, F, S>, 'replicas'>>;
export type PubSubNode<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> = Required<Node<M, F, S>>;
type PubSubToResubscribe = Record<
PubSubType.CHANNELS | PubSubType.PATTERNS,
PubSubTypeListeners
>;
export type OnShardedChannelMovedError = (
err: unknown,
channel: string,
listeners?: ChannelListeners
) => void;
export default class RedisClusterSlots<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> {
static #SLOTS = 16384;
readonly #options: RedisClusterOptions<M, F, S>;
readonly #Client: InstantiableRedisClient<M, F, S>;
readonly #onError: OnError;
readonly #nodeByAddress = new Map<string, ClusterNode<M, F, S>>();
readonly #slots: Array<SlotNodes<M, F, S>> = [];
readonly #emit: EventEmitter['emit'];
slots = new Array<Shard<M, F, S>>(RedisClusterSlots.#SLOTS);
shards = new Array<Shard<M, F, S>>();
masters = new Array<ShardNode<M, F, S>>();
replicas = new Array<ShardNode<M, F, S>>();
readonly nodeByAddress = new Map<string, MasterNode<M, F, S> | ShardNode<M, F, S>>();
pubSubNode?: PubSubNode<M, F, S>;
constructor(options: RedisClusterOptions<M, F, S>, onError: OnError) {
this.#options = options;
this.#Client = RedisClient.extend(options);
this.#onError = onError;
#isOpen = false;
get isOpen() {
return this.#isOpen;
}
async connect(): Promise<void> {
for (const rootNode of this.#options.rootNodes) {
if (await this.#discoverNodes(rootNode)) return;
constructor(
options: RedisClusterOptions<M, F, S>,
emit: EventEmitter['emit']
) {
this.#options = options;
this.#Client = RedisClient.extend(options);
this.#emit = emit;
}
async connect() {
if (this.#isOpen) {
throw new Error('Cluster already open');
}
this.#isOpen = true;
try {
await this.#discoverWithRootNodes();
} catch (err) {
this.#isOpen = false;
throw err;
}
}
async #discoverWithRootNodes() {
let start = Math.floor(Math.random() * this.#options.rootNodes.length);
for (let i = start; i < this.#options.rootNodes.length; i++) {
if (await this.#discover(this.#options.rootNodes[i])) return;
}
for (let i = 0; i < start; i++) {
if (await this.#discover(this.#options.rootNodes[i])) return;
}
throw new RootNodesUnavailableError();
}
async #discoverNodes(clientOptions?: RedisClusterClientOptions): Promise<boolean> {
const client = this.#initiateClient(clientOptions);
#resetSlots() {
this.slots = new Array(RedisClusterSlots.#SLOTS);
this.shards = [];
this.masters = [];
this.replicas = [];
this.#randomNodeIterator = undefined;
}
async #discover(rootNode?: RedisClusterClientOptions) {
const addressesInUse = new Set<string>();
try {
const shards = await this.#getShards(rootNode),
promises: Array<Promise<unknown>> = [],
eagerConnect = this.#options.minimizeConnections !== true;
this.#resetSlots();
for (const { from, to, master, replicas } of shards) {
const shard: Shard<M, F, S> = {
master: this.#initiateSlotNode(master, false, eagerConnect, addressesInUse, promises)
};
if (this.#options.useReplicas) {
shard.replicas = replicas.map(replica =>
this.#initiateSlotNode(replica, true, eagerConnect, addressesInUse, promises)
);
}
this.shards.push(shard);
for (let i = from; i <= to; i++) {
this.slots[i] = shard;
}
}
if (this.pubSubNode && !addressesInUse.has(this.pubSubNode.address)) {
if (types.isPromise(this.pubSubNode.client)) {
promises.push(
this.pubSubNode.client.then(client => client.disconnect())
);
this.pubSubNode = undefined;
} else {
promises.push(this.pubSubNode.client.disconnect());
const channelsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.CHANNELS),
patternsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.PATTERNS);
if (channelsListeners.size || patternsListeners.size) {
promises.push(
this.#initiatePubSubClient({
[PubSubType.CHANNELS]: channelsListeners,
[PubSubType.PATTERNS]: patternsListeners
})
);
}
}
}
for (const [address, node] of this.nodeByAddress.entries()) {
if (addressesInUse.has(address)) continue;
if (node.client) {
promises.push(
this.#execOnNodeClient(node.client, client => client.disconnect())
);
}
const { pubSubClient } = node as MasterNode<M, F, S>;
if (pubSubClient) {
promises.push(
this.#execOnNodeClient(pubSubClient, client => client.disconnect())
);
}
this.nodeByAddress.delete(address);
}
await Promise.all(promises);
return true;
} catch (err) {
this.#emit('error', err);
return false;
}
}
async #getShards(rootNode?: RedisClusterClientOptions) {
const client = new this.#Client(
this.#clientOptionsDefaults(rootNode, true)
);
client.on('error', err => this.#emit('error', err));
await client.connect();
try {
await this.#reset(await client.clusterNodes());
return true;
} catch (err) {
this.#onError(err);
return false;
// using `CLUSTER SLOTS` and not `CLUSTER SHARDS` to support older versions
return await client.clusterSlots();
} finally {
if (client.isOpen) {
await client.disconnect();
}
await client.disconnect();
}
}
#runningRediscoverPromise?: Promise<void>;
async rediscover(startWith: RedisClientType<M, F, S>): Promise<void> {
if (!this.#runningRediscoverPromise) {
this.#runningRediscoverPromise = this.#rediscover(startWith)
.finally(() => this.#runningRediscoverPromise = undefined);
}
return this.#runningRediscoverPromise;
}
async #rediscover(startWith: RedisClientType<M, F, S>): Promise<void> {
if (await this.#discoverNodes(startWith.options)) return;
for (const { client } of this.#nodeByAddress.values()) {
if (client === startWith) continue;
if (await this.#discoverNodes(client.options)) return;
}
throw new Error('None of the cluster nodes is available');
}
async #reset(masters: Array<RedisClusterMasterNode>): Promise<void> {
// Override this.#slots and add not existing clients to this.#nodeByAddress
const promises: Array<Promise<void>> = [],
clientsInUse = new Set<string>();
for (const master of masters) {
const slot = {
master: this.#initiateClientForNode(master, false, clientsInUse, promises),
replicas: this.#options.useReplicas ?
master.replicas.map(replica => this.#initiateClientForNode(replica, true, clientsInUse, promises)) :
[],
clientIterator: undefined // will be initiated in use
};
for (const { from, to } of master.slots) {
for (let i = from; i <= to; i++) {
this.#slots[i] = slot;
}
}
}
// Remove unused clients from this.#nodeByAddress using clientsInUse
for (const [address, { client }] of this.#nodeByAddress.entries()) {
if (clientsInUse.has(address)) continue;
promises.push(client.disconnect());
this.#nodeByAddress.delete(address);
}
await Promise.all(promises);
}
#clientOptionsDefaults(options?: RedisClusterClientOptions): RedisClusterClientOptions | undefined {
if (!this.#options.defaults) return options;
return {
...this.#options.defaults,
...options,
socket: this.#options.defaults.socket && options?.socket ? {
...this.#options.defaults.socket,
...options.socket
} : this.#options.defaults.socket ?? options?.socket
};
}
#initiateClient(options?: RedisClusterClientOptions): RedisClientType<M, F, S> {
return new this.#Client(this.#clientOptionsDefaults(options))
.on('error', this.#onError);
}
#getNodeAddress(address: string): NodeAddress | undefined {
switch (typeof this.#options.nodeAddressMap) {
case 'object':
@@ -164,111 +261,123 @@ export default class RedisClusterSlots<
}
}
#initiateClientForNode(
nodeData: RedisClusterMasterNode | RedisClusterReplicaNode,
readonly: boolean,
clientsInUse: Set<string>,
promises: Array<Promise<void>>
): ClusterNode<M, F, S> {
const address = `${nodeData.host}:${nodeData.port}`;
clientsInUse.add(address);
#clientOptionsDefaults(
options?: RedisClusterClientOptions,
disableReconnect?: boolean
): RedisClusterClientOptions | undefined {
let result: RedisClusterClientOptions | undefined;
if (this.#options.defaults) {
let socket;
if (this.#options.defaults.socket) {
socket = {
...this.#options.defaults.socket,
...options?.socket
};
} else {
socket = options?.socket;
}
let node = this.#nodeByAddress.get(address);
result = {
...this.#options.defaults,
...options,
socket
};
} else {
result = options;
}
if (disableReconnect) {
result ??= {};
result.socket ??= {};
result.socket.reconnectStrategy = false;
}
return result;
}
#initiateSlotNode(
{ id, ip, port }: ClusterSlotsNode,
readonly: boolean,
eagerConnent: boolean,
addressesInUse: Set<string>,
promises: Array<Promise<unknown>>
) {
const address = `${ip}:${port}`;
addressesInUse.add(address);
let node = this.nodeByAddress.get(address);
if (!node) {
node = {
id: nodeData.id,
client: this.#initiateClient({
socket: this.#getNodeAddress(address) ?? {
host: nodeData.host,
port: nodeData.port
},
readonly
})
id,
host: ip,
port,
address,
readonly,
client: undefined
};
promises.push(node.client.connect());
this.#nodeByAddress.set(address, node);
if (eagerConnent) {
promises.push(this.#createNodeClient(node));
}
this.nodeByAddress.set(address, node);
}
(readonly ? this.replicas : this.masters).push(node);
return node;
}
getSlotMaster(slot: number): ClusterNode<M, F, S> {
return this.#slots[slot].master;
}
*#slotClientIterator(slotNumber: number): IterableIterator<RedisClientType<M, F, S>> {
const slot = this.#slots[slotNumber];
yield slot.master.client;
for (const replica of slot.replicas) {
yield replica.client;
}
}
#getSlotClient(slotNumber: number): RedisClientType<M, F, S> {
const slot = this.#slots[slotNumber];
if (!slot.clientIterator) {
slot.clientIterator = this.#slotClientIterator(slotNumber);
}
const {done, value} = slot.clientIterator.next();
if (done) {
slot.clientIterator = undefined;
return this.#getSlotClient(slotNumber);
}
return value;
}
#randomClientIterator?: IterableIterator<ClusterNode<M, F, S>>;
#getRandomClient(): RedisClientType<M, F, S> {
if (!this.#nodeByAddress.size) {
throw new Error('Cluster is not connected');
}
if (!this.#randomClientIterator) {
this.#randomClientIterator = this.#nodeByAddress.values();
}
const {done, value} = this.#randomClientIterator.next();
if (done) {
this.#randomClientIterator = undefined;
return this.#getRandomClient();
}
return value.client;
}
getClient(firstKey?: RedisCommandArgument, isReadonly?: boolean): RedisClientType<M, F, S> {
if (!firstKey) {
return this.#getRandomClient();
}
const slot = calculateSlot(firstKey);
if (!isReadonly || !this.#options.useReplicas) {
return this.getSlotMaster(slot).client;
}
return this.#getSlotClient(slot);
}
getMasters(): Array<ClusterNode<M, F, S>> {
const masters = [];
for (const node of this.#nodeByAddress.values()) {
if (node.client.options?.readonly) continue;
masters.push(node);
}
return masters;
}
getNodeByAddress(address: string): ClusterNode<M, F, S> | undefined {
const mappedAddress = this.#getNodeAddress(address);
return this.#nodeByAddress.get(
mappedAddress ? `${mappedAddress.host}:${mappedAddress.port}` : address
async #createClient(
node: ShardNode<M, F, S>,
readonly = node.readonly
) {
const client = new this.#Client(
this.#clientOptionsDefaults({
socket: this.#getNodeAddress(node.address) ?? {
host: node.host,
port: node.port
},
readonly
})
);
client.on('error', err => this.#emit('error', err));
await client.connect();
return client;
}
#createNodeClient(node: ShardNode<M, F, S>) {
const promise = this.#createClient(node)
.then(client => {
node.client = client;
return client;
})
.catch(err => {
node.client = undefined;
throw err;
});
node.client = promise;
return promise;
}
nodeClient(node: ShardNode<M, F, S>) {
return node.client ?? this.#createNodeClient(node);
}
#runningRediscoverPromise?: Promise<void>;
async rediscover(startWith: RedisClientType<M, F, S>): Promise<void> {
this.#runningRediscoverPromise ??= this.#rediscover(startWith)
.finally(() => this.#runningRediscoverPromise = undefined);
return this.#runningRediscoverPromise;
}
async #rediscover(startWith: RedisClientType<M, F, S>): Promise<void> {
if (await this.#discover(startWith.options)) return;
return this.#discoverWithRootNodes();
}
quit(): Promise<void> {
@@ -280,14 +389,233 @@ export default class RedisClusterSlots<
}
async #destroy(fn: (client: RedisClientType<M, F, S>) => Promise<unknown>): Promise<void> {
this.#isOpen = false;
const promises = [];
for (const { client } of this.#nodeByAddress.values()) {
promises.push(fn(client));
for (const { master, replicas } of this.shards) {
if (master.client) {
promises.push(
this.#execOnNodeClient(master.client, fn)
);
}
if (master.pubSubClient) {
promises.push(
this.#execOnNodeClient(master.pubSubClient, fn)
);
}
if (replicas) {
for (const { client } of replicas) {
if (client) {
promises.push(
this.#execOnNodeClient(client, fn)
);
}
}
}
}
await Promise.all(promises);
if (this.pubSubNode) {
promises.push(this.#execOnNodeClient(this.pubSubNode.client, fn));
this.pubSubNode = undefined;
}
this.#nodeByAddress.clear();
this.#slots.splice(0);
this.#resetSlots();
this.nodeByAddress.clear();
await Promise.allSettled(promises);
}
#execOnNodeClient(
client: ClientOrPromise<M, F, S>,
fn: (client: RedisClientType<M, F, S>) => Promise<unknown>
) {
return types.isPromise(client) ?
client.then(fn) :
fn(client);
}
getClient(
firstKey: RedisCommandArgument | undefined,
isReadonly: boolean | undefined
): ClientOrPromise<M, F, S> {
if (!firstKey) {
return this.nodeClient(this.getRandomNode());
}
const slotNumber = calculateSlot(firstKey);
if (!isReadonly) {
return this.nodeClient(this.slots[slotNumber].master);
}
return this.nodeClient(this.getSlotRandomNode(slotNumber));
}
*#iterateAllNodes() {
let i = Math.floor(Math.random() * (this.masters.length + this.replicas.length));
if (i < this.masters.length) {
do {
yield this.masters[i];
} while (++i < this.masters.length);
for (const replica of this.replicas) {
yield replica;
}
} else {
i -= this.masters.length;
do {
yield this.replicas[i];
} while (++i < this.replicas.length);
}
while (true) {
for (const master of this.masters) {
yield master;
}
for (const replica of this.replicas) {
yield replica;
}
}
}
#randomNodeIterator?: IterableIterator<ShardNode<M, F, S>>;
getRandomNode() {
this.#randomNodeIterator ??= this.#iterateAllNodes();
return this.#randomNodeIterator.next().value as ShardNode<M, F, S>;
}
*#slotNodesIterator(slot: ShardWithReplicas<M, F, S>) {
let i = Math.floor(Math.random() * (1 + slot.replicas.length));
if (i < slot.replicas.length) {
do {
yield slot.replicas[i];
} while (++i < slot.replicas.length);
}
while (true) {
yield slot.master;
for (const replica of slot.replicas) {
yield replica;
}
}
}
getSlotRandomNode(slotNumber: number) {
const slot = this.slots[slotNumber];
if (!slot.replicas?.length) {
return slot.master;
}
slot.nodesIterator ??= this.#slotNodesIterator(slot as ShardWithReplicas<M, F, S>);
return slot.nodesIterator.next().value as ShardNode<M, F, S>;
}
getMasterByAddress(address: string) {
const master = this.nodeByAddress.get(address);
if (!master) return;
return this.nodeClient(master);
}
getPubSubClient() {
return this.pubSubNode ?
this.pubSubNode.client :
this.#initiatePubSubClient();
}
async #initiatePubSubClient(toResubscribe?: PubSubToResubscribe) {
const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)),
node = index < this.masters.length ?
this.masters[index] :
this.replicas[index - this.masters.length];
this.pubSubNode = {
address: node.address,
client: this.#createClient(node, true)
.then(async client => {
if (toResubscribe) {
await Promise.all([
client.extendPubSubListeners(PubSubType.CHANNELS, toResubscribe[PubSubType.CHANNELS]),
client.extendPubSubListeners(PubSubType.PATTERNS, toResubscribe[PubSubType.PATTERNS])
]);
}
this.pubSubNode!.client = client;
return client;
})
.catch(err => {
this.pubSubNode = undefined;
throw err;
})
};
return this.pubSubNode.client as Promise<RedisClientType<M, F, S>>;
}
async executeUnsubscribeCommand(
unsubscribe: (client: RedisClientType<M, F, S>) => Promise<void>
): Promise<void> {
const client = await this.getPubSubClient();
await unsubscribe(client);
if (!client.isPubSubActive && client.isOpen) {
await client.disconnect();
this.pubSubNode = undefined;
}
}
getShardedPubSubClient(channel: string) {
const { master } = this.slots[calculateSlot(channel)];
return master.pubSubClient ?? this.#initiateShardedPubSubClient(master);
}
#initiateShardedPubSubClient(master: MasterNode<M, F, S>) {
const promise = this.#createClient(master, true)
.then(client => {
client.on('server-sunsubscribe', async (channel, listeners) => {
try {
await this.rediscover(client);
const redirectTo = await this.getShardedPubSubClient(channel);
redirectTo.extendPubSubChannelListeners(
PubSubType.SHARDED,
channel,
listeners
);
} catch (err) {
this.#emit('sharded-shannel-moved-error', err, channel, listeners);
}
});
master.pubSubClient = client;
return client;
})
.catch(err => {
master.pubSubClient = undefined;
throw err;
});
master.pubSubClient = promise;
return promise;
}
async executeShardedUnsubscribeCommand(
channel: string,
unsubscribe: (client: RedisClientType<M, F, S>) => Promise<void>
): Promise<void> {
const { master } = this.slots[calculateSlot(channel)];
if (!master.pubSubClient) return Promise.resolve();
const client = await master.pubSubClient;
await unsubscribe(client);
if (!client.isPubSubActive && client.isOpen) {
await client.disconnect();
master.pubSubClient = undefined;
}
}
}