From 225efc0b4365d3a2aa5dd0c16e59639076531f56 Mon Sep 17 00:00:00 2001 From: Leibale Date: Mon, 2 Oct 2023 12:03:04 -0400 Subject: [PATCH] cluster --- packages/client/lib/client/index.ts | 25 +- packages/client/lib/cluster/cluster-slots.ts | 334 ++++++++---------- packages/client/lib/cluster/index.spec.ts | 236 ++++++------- packages/client/lib/cluster/index.ts | 16 +- .../client/lib/commands/CLIENT_NO-TOUCH.ts | 2 +- .../client/lib/commands/CLUSTER_MYSHARDID.ts | 2 +- .../lib/commands/CLUSTER_REPLICAS.spec.ts | 11 +- .../client/lib/commands/CLUSTER_REPLICAS.ts | 4 +- packages/client/lib/commands/FCALL.spec.ts | 2 +- packages/client/lib/commands/FCALL_RO.spec.ts | 2 +- .../client/lib/commands/FUNCTION_LOAD.spec.ts | 6 +- packages/client/lib/commands/MIGRATE.spec.ts | 142 ++++---- packages/client/lib/commands/MIGRATE.ts | 5 +- packages/client/lib/commands/RENAME.ts | 2 +- packages/client/lib/commands/RENAMENX.ts | 2 +- packages/client/lib/commands/ZPOPMAX_COUNT.ts | 2 +- packages/client/lib/commands/index.ts | 3 + packages/client/lib/multi-command.spec.ts | 4 +- packages/test-utils/lib/index.ts | 8 +- 19 files changed, 386 insertions(+), 422 deletions(-) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 60b814e3ea..4979c905b2 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -15,23 +15,13 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN'; import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; import { RedisPoolOptions, RedisClientPool } from './pool'; -interface ClientCommander< - M extends RedisModules, - F extends RedisFunctions, - S extends RedisScripts, - RESP extends RespVersions, - TYPE_MAPPING extends TypeMapping -> extends CommanderConfig { - commandOptions?: CommandOptions; -} - export interface RedisClientOptions< M extends RedisModules = RedisModules, F extends RedisFunctions = RedisFunctions, S extends RedisScripts = RedisScripts, RESP extends RespVersions = RespVersions, TYPE_MAPPING extends TypeMapping = TypeMapping -> extends ClientCommander { +> extends CommanderConfig { /** * `redis[s]://[[username][:password]@][host][:port][/db-number]` * See [`redis`](https://www.iana.org/assignments/uri-schemes/prov/redis) and [`rediss`](https://www.iana.org/assignments/uri-schemes/prov/rediss) IANA registration for more details @@ -75,6 +65,10 @@ export interface RedisClientOptions< * Useful with Redis deployments that do not honor TCP Keep-Alive. */ pingInterval?: number; + /** + * TODO + */ + commandOptions?: CommandOptions; } type WithCommands< @@ -205,9 +199,8 @@ export default class RedisClient< M extends RedisModules = {}, F extends RedisFunctions = {}, S extends RedisScripts = {}, - RESP extends RespVersions = 2, - TYPE_MAPPING extends TypeMapping = {} - >(config?: ClientCommander) { + RESP extends RespVersions = 2 + >(config?: CommanderConfig) { const Client = attachConfig({ BaseClass: RedisClient, commands: COMMANDS, @@ -220,7 +213,9 @@ export default class RedisClient< Client.prototype.Multi = RedisClientMultiCommand.extend(config); - return (options?: Omit>) => { + return ( + options?: Omit, keyof Exclude> + ) => { // returning a "proxy" to prevent the namespaces.self to leak between "proxies" return Object.create(new Client(options)) as RedisClientType; }; diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 19ce00735f..79fd2e2aeb 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -4,7 +4,7 @@ import RedisClient, { RedisClientOptions, RedisClientType } from '../client'; import { types } from 'node:util'; import { EventEmitter } from 'node:stream'; import { ChannelListeners, PubSubType, PubSubTypeListeners } from '../client/pub-sub'; -import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions } from '../RESP/types'; +import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types'; // TODO: ?! // We need to use 'require', because it's not possible with Typescript to import @@ -21,34 +21,26 @@ export type NodeAddressMap = { [address: string]: NodeAddress; } | ((address: string) => NodeAddress | undefined); -type ValueOrPromise = T | Promise; - -type ClientOrPromise< - M extends RedisModules, - F extends RedisFunctions, - S extends RedisScripts, - RESP extends RespVersions = 2 -> = ValueOrPromise>; - export interface Node< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts, - RESP extends RespVersions + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping > { address: string; - client?: ClientOrPromise; + client?: RedisClientType; + connectPromise?: Promise>; } export interface ShardNode< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts, - RESP extends RespVersions -> extends Node { + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping +> extends Node, NodeAddress { id: string; - host: string; - port: number; readonly: boolean; } @@ -56,35 +48,45 @@ export interface MasterNode< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts, - RESP extends RespVersions -> extends ShardNode { - pubSubClient?: ClientOrPromise; + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping +> extends ShardNode { + pubSub?: { + connectPromise?: Promise>; + client: RedisClientType; + }; } export interface Shard< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts, - RESP extends RespVersions + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping > { - master: MasterNode; - replicas?: Array>; - nodesIterator?: IterableIterator>; + master: MasterNode; + replicas?: Array>; + nodesIterator?: IterableIterator>; } type ShardWithReplicas< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts, - RESP extends RespVersions -> = Shard & Required, 'replicas'>>; + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping +> = Shard & Required, 'replicas'>>; -export type PubSubNode< +type PubSubNode< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts, - RESP extends RespVersions -> = Required>; + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping +> = ( + Exclude, 'client'> & + Required, 'client'>> +); type PubSubToResubscribe = Record< PubSubType.CHANNELS | PubSubType.PATTERNS, @@ -101,19 +103,19 @@ export default class RedisClusterSlots< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts, - RESP extends RespVersions + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping > { private static _SLOTS = 16384; - private readonly _options: RedisClusterOptions; + private readonly _options: RedisClusterOptions; private readonly _clientFactory: ReturnType>; private readonly _emit: EventEmitter['emit']; - slots = new Array>(RedisClusterSlots._SLOTS); - shards = new Array>(); - masters = new Array>(); - replicas = new Array>(); - readonly nodeByAddress = new Map | ShardNode>(); - pubSubNode?: PubSubNode; + slots = new Array>(RedisClusterSlots._SLOTS); + masters = new Array>(); + replicas = new Array>(); + readonly nodeByAddress = new Map | ShardNode>(); + pubSubNode?: PubSubNode; private _isOpen = false; @@ -122,7 +124,7 @@ export default class RedisClusterSlots< } constructor( - options: RedisClusterOptions, + options: RedisClusterOptions, emit: EventEmitter['emit'] ) { this._options = options; @@ -147,10 +149,12 @@ export default class RedisClusterSlots< private async _discoverWithRootNodes() { let start = Math.floor(Math.random() * this._options.rootNodes.length); for (let i = start; i < this._options.rootNodes.length; i++) { + if (!this._isOpen) throw new Error('Cluster closed'); if (await this._discover(this._options.rootNodes[i])) return; } for (let i = 0; i < start; i++) { + if (!this._isOpen) throw new Error('Cluster closed'); if (await this._discover(this._options.rootNodes[i])) return; } @@ -159,7 +163,6 @@ export default class RedisClusterSlots< private _resetSlots() { this.slots = new Array(RedisClusterSlots._SLOTS); - this.shards = []; this.masters = []; this.replicas = []; this._randomNodeIterator = undefined; @@ -167,15 +170,13 @@ export default class RedisClusterSlots< private async _discover(rootNode: RedisClusterClientOptions) { this._resetSlots(); - const addressesInUse = new Set(); - try { - const shards = await this._getShards(rootNode), + const addressesInUse = new Set(), promises: Array> = [], eagerConnect = this._options.minimizeConnections !== true; - for (const { from, to, master, replicas } of shards) { - const shard: Shard = { + for (const { from, to, master, replicas } of await this._getShards(rootNode)) { + const shard: Shard = { master: this._initiateSlotNode(master, false, eagerConnect, addressesInUse, promises) }; @@ -185,33 +186,24 @@ export default class RedisClusterSlots< ); } - this.shards.push(shard); - for (let i = from; i <= to; i++) { this.slots[i] = shard; } } if (this.pubSubNode && !addressesInUse.has(this.pubSubNode.address)) { - if (types.isPromise(this.pubSubNode.client)) { + const channelsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.CHANNELS), + patternsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.PATTERNS); + + this.pubSubNode.client.destroy(); + + if (channelsListeners.size || patternsListeners.size) { promises.push( - this.pubSubNode.client.then(client => client.disconnect()) + this._initiatePubSubClient({ + [PubSubType.CHANNELS]: channelsListeners, + [PubSubType.PATTERNS]: patternsListeners + }) ); - this.pubSubNode = undefined; - } else { - promises.push(this.pubSubNode.client.disconnect()); - - const channelsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.CHANNELS), - patternsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.PATTERNS); - - if (channelsListeners.size || patternsListeners.size) { - promises.push( - this._initiatePubSubClient({ - [PubSubType.CHANNELS]: channelsListeners, - [PubSubType.PATTERNS]: patternsListeners - }) - ); - } } } @@ -219,16 +211,12 @@ export default class RedisClusterSlots< if (addressesInUse.has(address)) continue; if (node.client) { - promises.push( - this._execOnNodeClient(node.client, client => client.disconnect()) - ); + node.client.destroy(); } - const { pubSubClient } = node as MasterNode; - if (pubSubClient) { - promises.push( - this._execOnNodeClient(pubSubClient, client => client.disconnect()) - ); + const { pubSub } = node as MasterNode; + if (pubSub) { + pubSub.client.destroy(); } this.nodeByAddress.delete(address); @@ -248,12 +236,12 @@ export default class RedisClusterSlots< options.socket ??= {}; options.socket.reconnectStrategy = false; options.RESP = this._options.RESP; + options.commandOptions = undefined; - const client = RedisClient.factory(this._options)(options); - - client.on('error', err => this._emit('error', err)); - - await client.connect(); + // TODO: find a way to avoid type casting + const client = await this._clientFactory(options as RedisClientOptions) + .on('error', err => this._emit('error', err)) + .connect(); try { // switch to `CLUSTER SHARDS` when Redis 7.0 will be the minimum supported version @@ -273,7 +261,7 @@ export default class RedisClusterSlots< } } - private _clientOptionsDefaults(options?: RedisClientOptions): RedisClientOptions | undefined { + private _clientOptionsDefaults(options?: RedisClientOptions) { if (!this._options.defaults) return options; let socket; @@ -301,7 +289,6 @@ export default class RedisClusterSlots< promises: Array> ) { const address = `${shard.host}:${shard.port}`; - addressesInUse.add(address); let node = this.nodeByAddress.get(address); if (!node) { @@ -309,7 +296,8 @@ export default class RedisClusterSlots< ...shard, address, readonly, - client: undefined + client: undefined, + connectPromise: undefined }; if (eagerConnent) { @@ -319,16 +307,16 @@ export default class RedisClusterSlots< this.nodeByAddress.set(address, node); } - (readonly ? this.replicas : this.masters).push(node); + if (!addressesInUse.has(address)) { + addressesInUse.add(address); + (readonly ? this.replicas : this.masters).push(node); + } return node; } - private async _createClient( - node: ShardNode, - readonly = node.readonly - ) { - const client = this._clientFactory( + private _createClient(node: ShardNode, readonly = node.readonly) { + return this._clientFactory( this._clientOptionsDefaults({ socket: this._getNodeAddress(node.address) ?? { host: node.host, @@ -337,38 +325,29 @@ export default class RedisClusterSlots< readonly, RESP: this._options.RESP }) + ).on('error', err => console.error(err)); + } + + private _createNodeClient(node: ShardNode, readonly?: boolean) { + const client = node.client = this._createClient(node, readonly); + return node.connectPromise = client.connect() + .finally(() => node.connectPromise = undefined); + } + + nodeClient(node: ShardNode) { + return ( + node.connectPromise ?? // if the node is connecting + node.client ?? // if the node is connected + this._createNodeClient(node) // if the not is disconnected ); - client.on('error', err => this._emit('error', err)); - - await client.connect(); - - return client; } - private _createNodeClient(node: ShardNode) { - const promise = this._createClient(node) - .then(client => { - node.client = client; - return client; - }) - .catch(err => { - node.client = undefined; - throw err; - }); - node.client = promise; - return promise; - } - - nodeClient(node: ShardNode) { - return node.client ?? this._createNodeClient(node); - } - - #runningRediscoverPromise?: Promise; + private _runningRediscoverPromise?: Promise; async rediscover(startWith: RedisClientType): Promise { - this.#runningRediscoverPromise ??= this._rediscover(startWith) - .finally(() => this.#runningRediscoverPromise = undefined); - return this.#runningRediscoverPromise; + this._runningRediscoverPromise ??= this._rediscover(startWith) + .finally(() => this._runningRediscoverPromise = undefined); + return this._runningRediscoverPromise; } private async _rediscover(startWith: RedisClientType): Promise { @@ -399,11 +378,11 @@ export default class RedisClusterSlots< this._isOpen = false; for (const client of this._clients()) { - this._execOnNodeClient(client, client => client.destroy()); + client.destroy(); } if (this.pubSubNode) { - this._execOnNodeClient(this.pubSubNode.client, client => client.destroy()); + this.pubSubNode.client.destroy(); this.pubSubNode = undefined; } @@ -412,21 +391,19 @@ export default class RedisClusterSlots< } private *_clients() { - for (const { master, replicas } of this.shards) { + for (const master of this.masters) { if (master.client) { yield master.client; } - if (master.pubSubClient) { - yield master.pubSubClient; + if (master.pubSub) { + yield master.pubSub.client; } + } - if (replicas) { - for (const { client } of replicas) { - if (client) { - yield client; - } - } + for (const replica of this.replicas) { + if (replica.client) { + yield replica.client; } } } @@ -436,11 +413,11 @@ export default class RedisClusterSlots< const promises = []; for (const client of this._clients()) { - promises.push(this._execOnNodeClient(client, fn)); + promises.push(fn(client)); } if (this.pubSubNode) { - promises.push(this._execOnNodeClient(this.pubSubNode.client, fn)); + promises.push(fn(this.pubSubNode.client)); this.pubSubNode = undefined; } @@ -450,19 +427,10 @@ export default class RedisClusterSlots< await Promise.allSettled(promises); } - private _execOnNodeClient( - client: ClientOrPromise, - fn: (client: RedisClientType) => T - ): T | Promise { - return types.isPromise(client) ? - client.then(fn) : - fn(client); - } - getClient( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined - ): ClientOrPromise { + ) { if (!firstKey) { return this.nodeClient(this.getRandomNode()); } @@ -503,14 +471,14 @@ export default class RedisClusterSlots< } } - _randomNodeIterator?: IterableIterator>; + _randomNodeIterator?: IterableIterator>; getRandomNode() { this._randomNodeIterator ??= this._iterateAllNodes(); - return this._randomNodeIterator.next().value as ShardNode; + return this._randomNodeIterator.next().value as ShardNode; } - private *_slotNodesIterator(slot: ShardWithReplicas) { + private *_slotNodesIterator(slot: ShardWithReplicas) { let i = Math.floor(Math.random() * (1 + slot.replicas.length)); if (i < slot.replicas.length) { do { @@ -533,8 +501,8 @@ export default class RedisClusterSlots< return slot.master; } - slot.nodesIterator ??= this._slotNodesIterator(slot as ShardWithReplicas); - return slot.nodesIterator.next().value as ShardNode; + slot.nodesIterator ??= this._slotNodesIterator(slot as ShardWithReplicas); + return slot.nodesIterator.next().value as ShardNode; } getMasterByAddress(address: string) { @@ -545,20 +513,22 @@ export default class RedisClusterSlots< } getPubSubClient() { - return this.pubSubNode ? - this.pubSubNode.client : - this._initiatePubSubClient(); + if (!this.pubSubNode) return this._initiatePubSubClient(); + + return this.pubSubNode.connectPromise ?? this.pubSubNode.client; } private async _initiatePubSubClient(toResubscribe?: PubSubToResubscribe) { const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)), node = index < this.masters.length ? this.masters[index] : - this.replicas[index - this.masters.length]; - + this.replicas[index - this.masters.length], + client = this._createClient(node, true); + this.pubSubNode = { address: node.address, - client: this._createClient(node, true) + client, + connectPromise: client.connect() .then(async client => { if (toResubscribe) { await Promise.all([ @@ -567,7 +537,7 @@ export default class RedisClusterSlots< ]); } - this.pubSubNode!.client = client; + this.pubSubNode!.connectPromise = undefined; return client; }) .catch(err => { @@ -576,7 +546,7 @@ export default class RedisClusterSlots< }) }; - return this.pubSubNode.client as Promise>; + return this.pubSubNode.connectPromise!; } async executeUnsubscribeCommand( @@ -593,52 +563,58 @@ export default class RedisClusterSlots< getShardedPubSubClient(channel: string) { const { master } = this.slots[calculateSlot(channel)]; - return master.pubSubClient ?? this.#initiateShardedPubSubClient(master); + if (!master.pubSub) return this._initiateShardedPubSubClient(master); + return master.pubSub.connectPromise ?? master.pubSub.client; } - #initiateShardedPubSubClient(master: MasterNode) { - const promise = this._createClient(master, true) - .then(client => { - client.on('server-sunsubscribe', async (channel, listeners) => { - try { - await this.rediscover(client); - const redirectTo = await this.getShardedPubSubClient(channel); - redirectTo.extendPubSubChannelListeners( - PubSubType.SHARDED, - channel, - listeners - ); - } catch (err) { - this._emit('sharded-shannel-moved-error', err, channel, listeners); - } - }); - - master.pubSubClient = client; - return client; - }) - .catch(err => { - master.pubSubClient = undefined; - throw err; + private async _initiateShardedPubSubClient(master: MasterNode) { + const client = this._createClient(master, true) + .on('server-sunsubscribe', async (channel, listeners) => { + try { + await this.rediscover(client); + const redirectTo = await this.getShardedPubSubClient(channel); + await redirectTo.extendPubSubChannelListeners( + PubSubType.SHARDED, + channel, + listeners + ); + } catch (err) { + this._emit('sharded-shannel-moved-error', err, channel, listeners); + } }); - master.pubSubClient = promise; + master.pubSub = { + client, + connectPromise: client.connect() + .then(client => { + master.pubSub!.connectPromise = undefined; + return client; + }) + .catch(err => { + master.pubSub = undefined; + throw err; + }) + }; - return promise; + return master.pubSub.connectPromise!; } async executeShardedUnsubscribeCommand( channel: string, - unsubscribe: (client: RedisClientType) => Promise - ): Promise { + unsubscribe: (client: RedisClientType) => Promise + ) { const { master } = this.slots[calculateSlot(channel)]; - if (!master.pubSubClient) return Promise.resolve(); + if (!master.pubSub) return; + + const client = master.pubSub.connectPromise ? + await master.pubSub.connectPromise : + master.pubSub.client; - const client = await master.pubSubClient; await unsubscribe(client); if (!client.isPubSubActive) { - await client.disconnect(); - master.pubSubClient = undefined; + client.destroy(); + master.pubSub = undefined; } } } diff --git a/packages/client/lib/cluster/index.spec.ts b/packages/client/lib/cluster/index.spec.ts index f163086357..4db5f32e85 100644 --- a/packages/client/lib/cluster/index.spec.ts +++ b/packages/client/lib/cluster/index.spec.ts @@ -1,11 +1,9 @@ import { strict as assert } from 'node:assert'; import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; import RedisCluster from '.'; -// import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT'; import { SQUARE_SCRIPT } from '../client/index.spec'; import { RootNodesUnavailableError } from '../errors'; import { spy } from 'sinon'; -// import { setTimeout } from 'node:timers/promises'; import RedisClient from '../client'; describe('Cluster', () => { @@ -69,58 +67,58 @@ describe('Cluster', () => { } }); - // testUtils.testWithCluster('should handle live resharding', async cluster => { - // const slot = 12539, - // key = 'key', - // value = 'value'; - // await cluster.set(key, value); + testUtils.testWithCluster('should handle live resharding', async cluster => { + const slot = 12539, + key = 'key', + value = 'value'; + await cluster.set(key, value); - // const importing = cluster.slots[0].master, - // migrating = cluster.slots[slot].master, - // [importingClient, migratingClient] = await Promise.all([ - // cluster.nodeClient(importing), - // cluster.nodeClient(migrating) - // ]); + const importing = cluster.slots[0].master, + migrating = cluster.slots[slot].master, + [importingClient, migratingClient] = await Promise.all([ + cluster.nodeClient(importing), + cluster.nodeClient(migrating) + ]); - // await Promise.all([ - // importingClient.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, migrating.id), - // migratingClient.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, importing.id) - // ]); + await Promise.all([ + importingClient.clusterSetSlot(slot, 'IMPORTING', migrating.id), + migratingClient.clusterSetSlot(slot, 'MIGRATING', importing.id) + ]); - // // should be able to get the key from the migrating node - // assert.equal( - // await cluster.get(key), - // value - // ); + // should be able to get the key from the migrating node + assert.equal( + await cluster.get(key), + value + ); - // await migratingClient.migrate( - // importing.host, - // importing.port, - // key, - // 0, - // 10 - // ); + await migratingClient.migrate( + importing.host, + importing.port, + key, + 0, + 10 + ); - // // should be able to get the key from the importing node using `ASKING` - // assert.equal( - // await cluster.get(key), - // value - // ); + // should be able to get the key from the importing node using `ASKING` + assert.equal( + await cluster.get(key), + value + ); - // await Promise.all([ - // importingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id), - // migratingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id), - // ]); + await Promise.all([ + importingClient.clusterSetSlot(slot, 'NODE', importing.id), + migratingClient.clusterSetSlot(slot, 'NODE', importing.id), + ]); - // // should handle `MOVED` errors - // assert.equal( - // await cluster.get(key), - // value - // ); - // }, { - // serverArguments: [], - // numberOfMasters: 2 - // }); + // should handle `MOVED` errors + assert.equal( + await cluster.get(key), + value + ); + }, { + serverArguments: [], + numberOfMasters: 2 + }); testUtils.testWithCluster('getRandomNode should spread the the load evenly', async cluster => { const totalNodes = cluster.masters.length + cluster.replicas.length, @@ -145,7 +143,6 @@ describe('Cluster', () => { testUtils.testWithCluster('cluster topology', async cluster => { assert.equal(cluster.slots.length, 16384); const { numberOfMasters, numberOfReplicas } = GLOBAL.CLUSTERS.WITH_REPLICAS; - assert.equal(cluster.shards.length, numberOfMasters); assert.equal(cluster.masters.length, numberOfMasters); assert.equal(cluster.replicas.length, numberOfReplicas * numberOfMasters); assert.equal(cluster.nodeByAddress.size, numberOfMasters + numberOfMasters * numberOfReplicas); @@ -239,54 +236,53 @@ describe('Cluster', () => { assert.equal(cluster.pubSubNode, undefined); }, GLOBAL.CLUSTERS.OPEN); - // testUtils.testWithCluster('should move listeners when PubSub node disconnects from the cluster', async cluster => { - // const listener = spy(); - // await cluster.subscribe('channel', listener); + testUtils.testWithCluster('should move listeners when PubSub node disconnects from the cluster', async cluster => { + const listener = spy(); + await cluster.subscribe('channel', listener); - // assert.ok(cluster.pubSubNode); - // const [migrating, importing] = cluster.masters[0].address === cluster.pubSubNode.address ? - // cluster.masters : - // [cluster.masters[1], cluster.masters[0]], - // [migratingClient, importingClient] = await Promise.all([ - // cluster.nodeClient(migrating), - // cluster.nodeClient(importing) - // ]); + assert.ok(cluster.pubSubNode); + const [migrating, importing] = cluster.masters[0].address === cluster.pubSubNode.address ? + cluster.masters : + [cluster.masters[1], cluster.masters[0]], + [migratingClient, importingClient] = await Promise.all([ + cluster.nodeClient(migrating), + cluster.nodeClient(importing) + ]); - // const range = cluster.slots[0].master === migrating ? { - // key: 'bar', // 5061 - // start: 0, - // end: 8191 - // } : { - // key: 'foo', // 12182 - // start: 8192, - // end: 16383 - // }; + const range = cluster.slots[0].master === migrating ? { + key: 'bar', // 5061 + start: 0, + end: 8191 + } : { + key: 'foo', // 12182 + start: 8192, + end: 16383 + }; - // await Promise.all([ - // migratingClient.clusterDelSlotsRange(range), - // importingClient.clusterDelSlotsRange(range), - // importingClient.clusterAddSlotsRange(range) - // ]); + // TODO: is there a better way to migrate slots without causing CLUSTERDOWN? + const promises: Array> = []; + for (let i = range.start; i <= range.end; i++) { + promises.push( + migratingClient.clusterSetSlot(i, 'NODE', importing.id), + importingClient.clusterSetSlot(i, 'NODE', importing.id) + ); + } + await Promise.all(promises); - // // wait for migrating node to be notified about the new topology - // while ((await migratingClient.clusterInfo()).state !== 'ok') { - // await setTimeout(50); - // } + // make sure to cause `MOVED` error + await cluster.get(range.key); - // // make sure to cause `MOVED` error - // await cluster.get(range.key); + await Promise.all([ + cluster.publish('channel', 'message'), + waitTillBeenCalled(listener) + ]); - // await Promise.all([ - // cluster.publish('channel', 'message'), - // waitTillBeenCalled(listener) - // ]); - - // assert.ok(listener.calledOnceWithExactly('message', 'channel')); - // }, { - // serverArguments: [], - // numberOfMasters: 2, - // minimumDockerVersion: [7] - // }); + assert.ok(listener.calledOnceWithExactly('message', 'channel')); + }, { + serverArguments: [], + numberOfMasters: 2, + minimumDockerVersion: [7] + }); testUtils.testWithCluster('ssubscribe & sunsubscribe', async cluster => { const listener = spy(); @@ -303,46 +299,44 @@ describe('Cluster', () => { await cluster.sUnsubscribe('channel', listener); // 10328 is the slot of `channel` - assert.equal(cluster.slots[10328].master.pubSubClient, undefined); + assert.equal(cluster.slots[10328].master.pubSub, undefined); }, { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [7] }); - // testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => { - // const SLOT = 10328, - // migrating = cluster.slots[SLOT].master, - // importing = cluster.masters.find(master => master !== migrating)!, - // [migratingClient, importingClient] = await Promise.all([ - // cluster.nodeClient(migrating), - // cluster.nodeClient(importing) - // ]); + testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => { + const SLOT = 10328, + migrating = cluster.slots[SLOT].master, + importing = cluster.masters.find(master => master !== migrating)!, + [migratingClient, importingClient] = await Promise.all([ + cluster.nodeClient(migrating), + cluster.nodeClient(importing) + ]); - // await Promise.all([ - // migratingClient.clusterDelSlots(SLOT), - // importingClient.clusterDelSlots(SLOT), - // importingClient.clusterAddSlots(SLOT) - // ]); + await Promise.all([ + migratingClient.clusterDelSlots(SLOT), + importingClient.clusterDelSlots(SLOT), + importingClient.clusterAddSlots(SLOT), + // cause "topology refresh" on both nodes + migratingClient.clusterSetSlot(SLOT, 'NODE', importing.id), + importingClient.clusterSetSlot(SLOT, 'NODE', importing.id) + ]); - // // wait for migrating node to be notified about the new topology - // while ((await migratingClient.clusterInfo()).state !== 'ok') { - // await setTimeout(50); - // } + const listener = spy(); - // const listener = spy(); + // will trigger `MOVED` error + await cluster.sSubscribe('channel', listener); - // // will trigger `MOVED` error - // await cluster.sSubscribe('channel', listener); + await Promise.all([ + waitTillBeenCalled(listener), + cluster.sPublish('channel', 'message') + ]); - // await Promise.all([ - // waitTillBeenCalled(listener), - // cluster.sPublish('channel', 'message') - // ]); - - // assert.ok(listener.calledOnceWithExactly('message', 'channel')); - // }, { - // serverArguments: [], - // minimumDockerVersion: [7] - // }); + assert.ok(listener.calledOnceWithExactly('message', 'channel')); + }, { + serverArguments: [], + minimumDockerVersion: [7] + }); }); }); diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index cc0e895ade..0b7cd210f4 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -17,7 +17,7 @@ interface ClusterCommander< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping, // POLICIES extends CommandPolicies -> extends CommanderConfig{ +> extends CommanderConfig { commandOptions?: ClusterCommandOptions; } @@ -303,7 +303,7 @@ export default class RedisCluster< private readonly _options: RedisClusterOptions; - private readonly _slots: RedisClusterSlots; + private readonly _slots: RedisClusterSlots; private _commandOptions?: ClusterCommandOptions; @@ -315,14 +315,6 @@ export default class RedisCluster< return this._slots.slots; } - /** - * An array of cluster shards, each shard contain its `master` and `replicas`. - * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica). - */ - get shards() { - return this._slots.shards; - } - /** * An array of the cluster masters. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node. @@ -442,7 +434,7 @@ export default class RedisCluster< private async _execute( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, - fn: (client: RedisClientType) => Promise + fn: (client: RedisClientType) => Promise ): Promise { const maxCommandRedirections = this._options.maxCommandRedirections ?? 16; let client = await this._slots.getClient(firstKey, isReadonly), @@ -655,7 +647,7 @@ export default class RedisCluster< return this._slots.destroy(); } - nodeClient(node: ShardNode) { + nodeClient(node: ShardNode) { return this._slots.nodeClient(node); } diff --git a/packages/client/lib/commands/CLIENT_NO-TOUCH.ts b/packages/client/lib/commands/CLIENT_NO-TOUCH.ts index e526be1766..a6fc5eb176 100644 --- a/packages/client/lib/commands/CLIENT_NO-TOUCH.ts +++ b/packages/client/lib/commands/CLIENT_NO-TOUCH.ts @@ -1,4 +1,4 @@ -import { SimpleStringReply, Command } from '@redis/client/dist/lib/RESP/types'; +import { SimpleStringReply, Command } from '../RESP/types'; export default { FIRST_KEY_INDEX: undefined, diff --git a/packages/client/lib/commands/CLUSTER_MYSHARDID.ts b/packages/client/lib/commands/CLUSTER_MYSHARDID.ts index 5e59b7221a..0c38b61634 100644 --- a/packages/client/lib/commands/CLUSTER_MYSHARDID.ts +++ b/packages/client/lib/commands/CLUSTER_MYSHARDID.ts @@ -1,4 +1,4 @@ -import { BlobStringReply, Command } from '@redis/client/dist/lib/RESP/types'; +import { BlobStringReply, Command } from '../RESP/types'; export default { FIRST_KEY_INDEX: undefined, diff --git a/packages/client/lib/commands/CLUSTER_REPLICAS.spec.ts b/packages/client/lib/commands/CLUSTER_REPLICAS.spec.ts index 5e1a0341ca..1a48f36088 100644 --- a/packages/client/lib/commands/CLUSTER_REPLICAS.spec.ts +++ b/packages/client/lib/commands/CLUSTER_REPLICAS.spec.ts @@ -11,10 +11,11 @@ describe('CLUSTER REPLICAS', () => { }); testUtils.testWithCluster('clusterNode.clusterReplicas', async cluster => { - const client = await cluster.nodeClient(cluster.masters[0]); - assert.equal( - typeof await client.clusterReplicas(cluster.masters[0].id), - 'string' - ); + const client = await cluster.nodeClient(cluster.masters[0]), + reply = await client.clusterReplicas(cluster.masters[0].id); + assert.ok(Array.isArray(reply)); + for (const replica of reply) { + assert.equal(typeof replica, 'string'); + } }, GLOBAL.CLUSTERS.OPEN); }); diff --git a/packages/client/lib/commands/CLUSTER_REPLICAS.ts b/packages/client/lib/commands/CLUSTER_REPLICAS.ts index 831c6bc505..8e0fe2cdfd 100644 --- a/packages/client/lib/commands/CLUSTER_REPLICAS.ts +++ b/packages/client/lib/commands/CLUSTER_REPLICAS.ts @@ -1,4 +1,4 @@ -import { RedisArgument, VerbatimStringReply, Command } from '../RESP/types'; +import { RedisArgument, ArrayReply, BlobStringReply, Command } from '../RESP/types'; export default { FIRST_KEY_INDEX: undefined, @@ -6,5 +6,5 @@ export default { transformArguments(nodeId: RedisArgument) { return ['CLUSTER', 'REPLICAS', nodeId]; }, - transformReply: undefined as unknown as () => VerbatimStringReply + transformReply: undefined as unknown as () => ArrayReply } as const satisfies Command; diff --git a/packages/client/lib/commands/FCALL.spec.ts b/packages/client/lib/commands/FCALL.spec.ts index 35ae8c87c2..286f2a371b 100644 --- a/packages/client/lib/commands/FCALL.spec.ts +++ b/packages/client/lib/commands/FCALL.spec.ts @@ -21,7 +21,7 @@ describe('FCALL', () => { loadMathFunction(client), client.set('key', '2'), client.fCall(MATH_FUNCTION.library.square.NAME, { - arguments: ['key'] + keys: ['key'] }) ]); diff --git a/packages/client/lib/commands/FCALL_RO.spec.ts b/packages/client/lib/commands/FCALL_RO.spec.ts index 0b172d1e21..57edcebebe 100644 --- a/packages/client/lib/commands/FCALL_RO.spec.ts +++ b/packages/client/lib/commands/FCALL_RO.spec.ts @@ -21,7 +21,7 @@ describe('FCALL_RO', () => { loadMathFunction(client), client.set('key', '2'), client.fCallRo(MATH_FUNCTION.library.square.NAME, { - arguments: ['key'] + keys: ['key'] }) ]); diff --git a/packages/client/lib/commands/FUNCTION_LOAD.spec.ts b/packages/client/lib/commands/FUNCTION_LOAD.spec.ts index 657f6d0325..fe896bdf8c 100644 --- a/packages/client/lib/commands/FUNCTION_LOAD.spec.ts +++ b/packages/client/lib/commands/FUNCTION_LOAD.spec.ts @@ -4,6 +4,8 @@ import FUNCTION_LOAD from './FUNCTION_LOAD'; import { RedisClientType } from '../client'; import { NumberReply, RedisFunctions, RedisModules, RedisScripts, RespVersions } from '../RESP/types'; + + export const MATH_FUNCTION = { name: 'math', engine: 'LUA', @@ -11,10 +13,10 @@ export const MATH_FUNCTION = { `#!LUA name=math redis.register_function { function_name = "square", - callback = function(keys, args) { + callback = function(keys, args) local number = redis.call('GET', keys[1]) return number * number - }, + end, flags = { "no-writes" } }`, library: { diff --git a/packages/client/lib/commands/MIGRATE.spec.ts b/packages/client/lib/commands/MIGRATE.spec.ts index 3d14df5018..880d59a09c 100644 --- a/packages/client/lib/commands/MIGRATE.spec.ts +++ b/packages/client/lib/commands/MIGRATE.spec.ts @@ -1,76 +1,76 @@ import { strict as assert } from 'node:assert'; -import { transformArguments } from './MIGRATE'; +import MIGRATE from './MIGRATE'; describe('MIGRATE', () => { - describe('transformArguments', () => { - it('single key', () => { - assert.deepEqual( - transformArguments('127.0.0.1', 6379, 'key', 0, 10), - ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10'] - ); - }); - - it('multiple keys', () => { - assert.deepEqual( - transformArguments('127.0.0.1', 6379, ['1', '2'], 0, 10), - ['MIGRATE', '127.0.0.1', '6379', '', '0', '10', 'KEYS', '1', '2'] - ); - }); - - it('with COPY', () => { - assert.deepEqual( - transformArguments('127.0.0.1', 6379, 'key', 0, 10, { - COPY: true - }), - ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'COPY'] - ); - }); - - it('with REPLACE', () => { - assert.deepEqual( - transformArguments('127.0.0.1', 6379, 'key', 0, 10, { - REPLACE: true - }), - ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'REPLACE'] - ); - }); - - describe('with AUTH', () => { - it('password only', () => { - assert.deepEqual( - transformArguments('127.0.0.1', 6379, 'key', 0, 10, { - AUTH: { - password: 'password' - } - }), - ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'AUTH', 'password'] - ); - }); - - it('username & password', () => { - assert.deepEqual( - transformArguments('127.0.0.1', 6379, 'key', 0, 10, { - AUTH: { - username: 'username', - password: 'password' - } - }), - ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'AUTH2', 'username', 'password'] - ); - }); - }); - - it('with COPY, REPLACE, AUTH', () => { - assert.deepEqual( - transformArguments('127.0.0.1', 6379, 'key', 0, 10, { - COPY: true, - REPLACE: true, - AUTH: { - password: 'password' - } - }), - ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'COPY', 'REPLACE', 'AUTH', 'password'] - ); - }); + describe('transformArguments', () => { + it('single key', () => { + assert.deepEqual( + MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10), + ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10'] + ); }); + + it('multiple keys', () => { + assert.deepEqual( + MIGRATE.transformArguments('127.0.0.1', 6379, ['1', '2'], 0, 10), + ['MIGRATE', '127.0.0.1', '6379', '', '0', '10', 'KEYS', '1', '2'] + ); + }); + + it('with COPY', () => { + assert.deepEqual( + MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10, { + COPY: true + }), + ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'COPY'] + ); + }); + + it('with REPLACE', () => { + assert.deepEqual( + MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10, { + REPLACE: true + }), + ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'REPLACE'] + ); + }); + + describe('with AUTH', () => { + it('password only', () => { + assert.deepEqual( + MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10, { + AUTH: { + password: 'password' + } + }), + ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'AUTH', 'password'] + ); + }); + + it('username & password', () => { + assert.deepEqual( + MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10, { + AUTH: { + username: 'username', + password: 'password' + } + }), + ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'AUTH2', 'username', 'password'] + ); + }); + }); + + it('with COPY, REPLACE, AUTH', () => { + assert.deepEqual( + MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10, { + COPY: true, + REPLACE: true, + AUTH: { + password: 'password' + } + }), + ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'COPY', 'REPLACE', 'AUTH', 'password'] + ); + }); + }); }); diff --git a/packages/client/lib/commands/MIGRATE.ts b/packages/client/lib/commands/MIGRATE.ts index adb1ae8d19..4821f93fcf 100644 --- a/packages/client/lib/commands/MIGRATE.ts +++ b/packages/client/lib/commands/MIGRATE.ts @@ -1,13 +1,14 @@ import { RedisArgument, SimpleStringReply, Command } from '../RESP/types'; import { AuthOptions } from './AUTH'; -interface MigrateOptions { +export interface MigrateOptions { COPY?: true; REPLACE?: true; AUTH?: AuthOptions; } export default { + IS_READ_ONLY: false, transformArguments( host: RedisArgument, port: number, @@ -62,5 +63,5 @@ export default { return args; }, - transformReply: undefined as unknown as () => SimpleStringReply + transformReply: undefined as unknown as () => SimpleStringReply<'OK'> } as const satisfies Command; diff --git a/packages/client/lib/commands/RENAME.ts b/packages/client/lib/commands/RENAME.ts index 484eb35069..16e883d053 100644 --- a/packages/client/lib/commands/RENAME.ts +++ b/packages/client/lib/commands/RENAME.ts @@ -1,7 +1,7 @@ import { RedisArgument, SimpleStringReply, Command } from '../RESP/types'; export default { - FIRST_KEY_INDEX: undefined, + FIRST_KEY_INDEX: 1, IS_READ_ONLY: true, transformArguments(key: RedisArgument, newKey: RedisArgument) { return ['RENAME', key, newKey]; diff --git a/packages/client/lib/commands/RENAMENX.ts b/packages/client/lib/commands/RENAMENX.ts index 0048515d44..3a4f155d5a 100644 --- a/packages/client/lib/commands/RENAMENX.ts +++ b/packages/client/lib/commands/RENAMENX.ts @@ -1,7 +1,7 @@ import { RedisArgument, NumberReply, Command } from '../RESP/types'; export default { - FIRST_KEY_INDEX: undefined, + FIRST_KEY_INDEX: 1, IS_READ_ONLY: true, transformArguments(key: RedisArgument, newKey: RedisArgument) { return ['RENAMENX', key, newKey]; diff --git a/packages/client/lib/commands/ZPOPMAX_COUNT.ts b/packages/client/lib/commands/ZPOPMAX_COUNT.ts index 6ca002cca1..00d39536ae 100644 --- a/packages/client/lib/commands/ZPOPMAX_COUNT.ts +++ b/packages/client/lib/commands/ZPOPMAX_COUNT.ts @@ -2,7 +2,7 @@ import { RedisArgument, Command } from '../RESP/types'; import { transformSortedSetReply } from './generic-transformers'; export default { - FIRST_KEY_INDEX: undefined, + FIRST_KEY_INDEX: 1, IS_READ_ONLY: false, transformArguments(key: RedisArgument, count: number) { return ['ZPOPMAX', key, count.toString()]; diff --git a/packages/client/lib/commands/index.ts b/packages/client/lib/commands/index.ts index 30782981ab..84a726d26e 100644 --- a/packages/client/lib/commands/index.ts +++ b/packages/client/lib/commands/index.ts @@ -184,6 +184,7 @@ import MEMORY_PURGE from './MEMORY_PURGE'; import MEMORY_STATS from './MEMORY_STATS'; import MEMORY_USAGE from './MEMORY_USAGE'; import MGET from './MGET'; +import MIGRATE from './MIGRATE'; import MODULE_LIST from './MODULE_LIST'; import MODULE_LOAD from './MODULE_LOAD'; import MODULE_UNLOAD from './MODULE_UNLOAD'; @@ -703,6 +704,8 @@ export default { memoryUsage: MEMORY_USAGE, MGET, mGet: MGET, + MIGRATE, + migrate: MIGRATE, MODULE_LIST, moduleList: MODULE_LIST, MODULE_LOAD, diff --git a/packages/client/lib/multi-command.spec.ts b/packages/client/lib/multi-command.spec.ts index 0223006041..7e77f88d10 100644 --- a/packages/client/lib/multi-command.spec.ts +++ b/packages/client/lib/multi-command.spec.ts @@ -20,7 +20,7 @@ describe('Multi Command', () => { multi.addScript(SQUARE_SCRIPT, ['1']); assert.deepEqual( Array.from(multi.queue.at(-1).args), - ['EVAL', SQUARE_SCRIPT.SCRIPT, '0', '1'] + ['EVAL', SQUARE_SCRIPT.SCRIPT, '1', '1'] ); }); @@ -28,7 +28,7 @@ describe('Multi Command', () => { multi.addScript(SQUARE_SCRIPT, ['2']); assert.deepEqual( Array.from(multi.queue.at(-1).args), - ['EVALSHA', SQUARE_SCRIPT.SHA1, '0', '2'] + ['EVALSHA', SQUARE_SCRIPT.SHA1, '1', '2'] ); }); diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 68e9630769..ab9da32a94 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -200,9 +200,9 @@ export default class TestUtils { // POLICIES extends CommandPolicies >(cluster: RedisClusterType): Promise { return Promise.all( - cluster.masters.map(async ({ client }) => { - if (client) { - await (await client).flushAll(); + cluster.masters.map(async master => { + if (master.client) { + (await cluster.nodeClient(master)).flushAll(); } }) ); @@ -256,7 +256,7 @@ export default class TestUtils { await fn(cluster); } finally { await TestUtils.#clusterFlushAll(cluster); - await cluster.disconnect(); + cluster.destroy(); } }); }