1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00
This commit is contained in:
Leibale
2023-10-02 12:03:04 -04:00
parent d62e332470
commit 225efc0b43
19 changed files with 386 additions and 422 deletions

View File

@@ -15,23 +15,13 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
import { RedisPoolOptions, RedisClientPool } from './pool'; import { RedisPoolOptions, RedisClientPool } from './pool';
interface ClientCommander<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> extends CommanderConfig<M, F, S, RESP> {
commandOptions?: CommandOptions<TYPE_MAPPING>;
}
export interface RedisClientOptions< export interface RedisClientOptions<
M extends RedisModules = RedisModules, M extends RedisModules = RedisModules,
F extends RedisFunctions = RedisFunctions, F extends RedisFunctions = RedisFunctions,
S extends RedisScripts = RedisScripts, S extends RedisScripts = RedisScripts,
RESP extends RespVersions = RespVersions, RESP extends RespVersions = RespVersions,
TYPE_MAPPING extends TypeMapping = TypeMapping TYPE_MAPPING extends TypeMapping = TypeMapping
> extends ClientCommander<M, F, S, RESP, TYPE_MAPPING> { > extends CommanderConfig<M, F, S, RESP> {
/** /**
* `redis[s]://[[username][:password]@][host][:port][/db-number]` * `redis[s]://[[username][:password]@][host][:port][/db-number]`
* See [`redis`](https://www.iana.org/assignments/uri-schemes/prov/redis) and [`rediss`](https://www.iana.org/assignments/uri-schemes/prov/rediss) IANA registration for more details * See [`redis`](https://www.iana.org/assignments/uri-schemes/prov/redis) and [`rediss`](https://www.iana.org/assignments/uri-schemes/prov/rediss) IANA registration for more details
@@ -75,6 +65,10 @@ export interface RedisClientOptions<
* Useful with Redis deployments that do not honor TCP Keep-Alive. * Useful with Redis deployments that do not honor TCP Keep-Alive.
*/ */
pingInterval?: number; pingInterval?: number;
/**
* TODO
*/
commandOptions?: CommandOptions<TYPE_MAPPING>;
} }
type WithCommands< type WithCommands<
@@ -205,9 +199,8 @@ export default class RedisClient<
M extends RedisModules = {}, M extends RedisModules = {},
F extends RedisFunctions = {}, F extends RedisFunctions = {},
S extends RedisScripts = {}, S extends RedisScripts = {},
RESP extends RespVersions = 2, RESP extends RespVersions = 2
TYPE_MAPPING extends TypeMapping = {} >(config?: CommanderConfig<M, F, S, RESP>) {
>(config?: ClientCommander<M, F, S, RESP, TYPE_MAPPING>) {
const Client = attachConfig({ const Client = attachConfig({
BaseClass: RedisClient, BaseClass: RedisClient,
commands: COMMANDS, commands: COMMANDS,
@@ -220,7 +213,9 @@ export default class RedisClient<
Client.prototype.Multi = RedisClientMultiCommand.extend(config); Client.prototype.Multi = RedisClientMultiCommand.extend(config);
return (options?: Omit<RedisClientOptions, keyof Exclude<typeof config, undefined>>) => { return <TYPE_MAPPING extends TypeMapping = {}>(
options?: Omit<RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>, keyof Exclude<typeof config, undefined>>
) => {
// returning a "proxy" to prevent the namespaces.self to leak between "proxies" // returning a "proxy" to prevent the namespaces.self to leak between "proxies"
return Object.create(new Client(options)) as RedisClientType<M, F, S, RESP, TYPE_MAPPING>; return Object.create(new Client(options)) as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
}; };

View File

@@ -4,7 +4,7 @@ import RedisClient, { RedisClientOptions, RedisClientType } from '../client';
import { types } from 'node:util'; import { types } from 'node:util';
import { EventEmitter } from 'node:stream'; import { EventEmitter } from 'node:stream';
import { ChannelListeners, PubSubType, PubSubTypeListeners } from '../client/pub-sub'; import { ChannelListeners, PubSubType, PubSubTypeListeners } from '../client/pub-sub';
import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions } from '../RESP/types'; import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
// TODO: ?! // TODO: ?!
// We need to use 'require', because it's not possible with Typescript to import // We need to use 'require', because it's not possible with Typescript to import
@@ -21,34 +21,26 @@ export type NodeAddressMap = {
[address: string]: NodeAddress; [address: string]: NodeAddress;
} | ((address: string) => NodeAddress | undefined); } | ((address: string) => NodeAddress | undefined);
type ValueOrPromise<T> = T | Promise<T>;
type ClientOrPromise<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions = 2
> = ValueOrPromise<RedisClientType<M, F, S, RESP>>;
export interface Node< export interface Node<
M extends RedisModules, M extends RedisModules,
F extends RedisFunctions, F extends RedisFunctions,
S extends RedisScripts, S extends RedisScripts,
RESP extends RespVersions RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> { > {
address: string; address: string;
client?: ClientOrPromise<M, F, S, RESP>; client?: RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
connectPromise?: Promise<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>;
} }
export interface ShardNode< export interface ShardNode<
M extends RedisModules, M extends RedisModules,
F extends RedisFunctions, F extends RedisFunctions,
S extends RedisScripts, S extends RedisScripts,
RESP extends RespVersions RESP extends RespVersions,
> extends Node<M, F, S, RESP> { TYPE_MAPPING extends TypeMapping
> extends Node<M, F, S, RESP, TYPE_MAPPING>, NodeAddress {
id: string; id: string;
host: string;
port: number;
readonly: boolean; readonly: boolean;
} }
@@ -56,35 +48,45 @@ export interface MasterNode<
M extends RedisModules, M extends RedisModules,
F extends RedisFunctions, F extends RedisFunctions,
S extends RedisScripts, S extends RedisScripts,
RESP extends RespVersions RESP extends RespVersions,
> extends ShardNode<M, F, S, RESP> { TYPE_MAPPING extends TypeMapping
pubSubClient?: ClientOrPromise<M, F, S, RESP>; > extends ShardNode<M, F, S, RESP, TYPE_MAPPING> {
pubSub?: {
connectPromise?: Promise<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>;
client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
};
} }
export interface Shard< export interface Shard<
M extends RedisModules, M extends RedisModules,
F extends RedisFunctions, F extends RedisFunctions,
S extends RedisScripts, S extends RedisScripts,
RESP extends RespVersions RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> { > {
master: MasterNode<M, F, S, RESP>; master: MasterNode<M, F, S, RESP, TYPE_MAPPING>;
replicas?: Array<ShardNode<M, F, S, RESP>>; replicas?: Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>;
nodesIterator?: IterableIterator<ShardNode<M, F, S, RESP>>; nodesIterator?: IterableIterator<ShardNode<M, F, S, RESP, TYPE_MAPPING>>;
} }
type ShardWithReplicas< type ShardWithReplicas<
M extends RedisModules, M extends RedisModules,
F extends RedisFunctions, F extends RedisFunctions,
S extends RedisScripts, S extends RedisScripts,
RESP extends RespVersions RESP extends RespVersions,
> = Shard<M, F, S, RESP> & Required<Pick<Shard<M, F, S, RESP>, 'replicas'>>; TYPE_MAPPING extends TypeMapping
> = Shard<M, F, S, RESP, TYPE_MAPPING> & Required<Pick<Shard<M, F, S, RESP, TYPE_MAPPING>, 'replicas'>>;
export type PubSubNode< type PubSubNode<
M extends RedisModules, M extends RedisModules,
F extends RedisFunctions, F extends RedisFunctions,
S extends RedisScripts, S extends RedisScripts,
RESP extends RespVersions RESP extends RespVersions,
> = Required<Node<M, F, S, RESP>>; TYPE_MAPPING extends TypeMapping
> = (
Exclude<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'> &
Required<Pick<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'>>
);
type PubSubToResubscribe = Record< type PubSubToResubscribe = Record<
PubSubType.CHANNELS | PubSubType.PATTERNS, PubSubType.CHANNELS | PubSubType.PATTERNS,
@@ -101,19 +103,19 @@ export default class RedisClusterSlots<
M extends RedisModules, M extends RedisModules,
F extends RedisFunctions, F extends RedisFunctions,
S extends RedisScripts, S extends RedisScripts,
RESP extends RespVersions RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> { > {
private static _SLOTS = 16384; private static _SLOTS = 16384;
private readonly _options: RedisClusterOptions<M, F, S, RESP>; private readonly _options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING>;
private readonly _clientFactory: ReturnType<typeof RedisClient.factory<M, F, S, RESP>>; private readonly _clientFactory: ReturnType<typeof RedisClient.factory<M, F, S, RESP>>;
private readonly _emit: EventEmitter['emit']; private readonly _emit: EventEmitter['emit'];
slots = new Array<Shard<M, F, S, RESP>>(RedisClusterSlots._SLOTS); slots = new Array<Shard<M, F, S, RESP, TYPE_MAPPING>>(RedisClusterSlots._SLOTS);
shards = new Array<Shard<M, F, S,RESP>>(); masters = new Array<MasterNode<M, F, S, RESP, TYPE_MAPPING>>();
masters = new Array<ShardNode<M, F, S, RESP>>(); replicas = new Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
replicas = new Array<ShardNode<M, F, S, RESP>>(); readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP> | ShardNode<M, F, S, RESP>>(); pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>;
pubSubNode?: PubSubNode<M, F, S, RESP>;
private _isOpen = false; private _isOpen = false;
@@ -122,7 +124,7 @@ export default class RedisClusterSlots<
} }
constructor( constructor(
options: RedisClusterOptions<M, F, S, RESP>, options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING>,
emit: EventEmitter['emit'] emit: EventEmitter['emit']
) { ) {
this._options = options; this._options = options;
@@ -147,10 +149,12 @@ export default class RedisClusterSlots<
private async _discoverWithRootNodes() { private async _discoverWithRootNodes() {
let start = Math.floor(Math.random() * this._options.rootNodes.length); let start = Math.floor(Math.random() * this._options.rootNodes.length);
for (let i = start; i < this._options.rootNodes.length; i++) { for (let i = start; i < this._options.rootNodes.length; i++) {
if (!this._isOpen) throw new Error('Cluster closed');
if (await this._discover(this._options.rootNodes[i])) return; if (await this._discover(this._options.rootNodes[i])) return;
} }
for (let i = 0; i < start; i++) { for (let i = 0; i < start; i++) {
if (!this._isOpen) throw new Error('Cluster closed');
if (await this._discover(this._options.rootNodes[i])) return; if (await this._discover(this._options.rootNodes[i])) return;
} }
@@ -159,7 +163,6 @@ export default class RedisClusterSlots<
private _resetSlots() { private _resetSlots() {
this.slots = new Array(RedisClusterSlots._SLOTS); this.slots = new Array(RedisClusterSlots._SLOTS);
this.shards = [];
this.masters = []; this.masters = [];
this.replicas = []; this.replicas = [];
this._randomNodeIterator = undefined; this._randomNodeIterator = undefined;
@@ -167,15 +170,13 @@ export default class RedisClusterSlots<
private async _discover(rootNode: RedisClusterClientOptions) { private async _discover(rootNode: RedisClusterClientOptions) {
this._resetSlots(); this._resetSlots();
const addressesInUse = new Set<string>();
try { try {
const shards = await this._getShards(rootNode), const addressesInUse = new Set<string>(),
promises: Array<Promise<unknown>> = [], promises: Array<Promise<unknown>> = [],
eagerConnect = this._options.minimizeConnections !== true; eagerConnect = this._options.minimizeConnections !== true;
for (const { from, to, master, replicas } of shards) { for (const { from, to, master, replicas } of await this._getShards(rootNode)) {
const shard: Shard<M, F, S, RESP> = { const shard: Shard<M, F, S, RESP, TYPE_MAPPING> = {
master: this._initiateSlotNode(master, false, eagerConnect, addressesInUse, promises) master: this._initiateSlotNode(master, false, eagerConnect, addressesInUse, promises)
}; };
@@ -185,33 +186,24 @@ export default class RedisClusterSlots<
); );
} }
this.shards.push(shard);
for (let i = from; i <= to; i++) { for (let i = from; i <= to; i++) {
this.slots[i] = shard; this.slots[i] = shard;
} }
} }
if (this.pubSubNode && !addressesInUse.has(this.pubSubNode.address)) { if (this.pubSubNode && !addressesInUse.has(this.pubSubNode.address)) {
if (types.isPromise(this.pubSubNode.client)) { const channelsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.CHANNELS),
patternsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.PATTERNS);
this.pubSubNode.client.destroy();
if (channelsListeners.size || patternsListeners.size) {
promises.push( promises.push(
this.pubSubNode.client.then(client => client.disconnect()) this._initiatePubSubClient({
[PubSubType.CHANNELS]: channelsListeners,
[PubSubType.PATTERNS]: patternsListeners
})
); );
this.pubSubNode = undefined;
} else {
promises.push(this.pubSubNode.client.disconnect());
const channelsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.CHANNELS),
patternsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.PATTERNS);
if (channelsListeners.size || patternsListeners.size) {
promises.push(
this._initiatePubSubClient({
[PubSubType.CHANNELS]: channelsListeners,
[PubSubType.PATTERNS]: patternsListeners
})
);
}
} }
} }
@@ -219,16 +211,12 @@ export default class RedisClusterSlots<
if (addressesInUse.has(address)) continue; if (addressesInUse.has(address)) continue;
if (node.client) { if (node.client) {
promises.push( node.client.destroy();
this._execOnNodeClient(node.client, client => client.disconnect())
);
} }
const { pubSubClient } = node as MasterNode<M, F, S, RESP>; const { pubSub } = node as MasterNode<M, F, S, RESP, TYPE_MAPPING>;
if (pubSubClient) { if (pubSub) {
promises.push( pubSub.client.destroy();
this._execOnNodeClient(pubSubClient, client => client.disconnect())
);
} }
this.nodeByAddress.delete(address); this.nodeByAddress.delete(address);
@@ -248,12 +236,12 @@ export default class RedisClusterSlots<
options.socket ??= {}; options.socket ??= {};
options.socket.reconnectStrategy = false; options.socket.reconnectStrategy = false;
options.RESP = this._options.RESP; options.RESP = this._options.RESP;
options.commandOptions = undefined;
const client = RedisClient.factory(this._options)(options); // TODO: find a way to avoid type casting
const client = await this._clientFactory(options as RedisClientOptions<M, F, S, RESP, {}>)
client.on('error', err => this._emit('error', err)); .on('error', err => this._emit('error', err))
.connect();
await client.connect();
try { try {
// switch to `CLUSTER SHARDS` when Redis 7.0 will be the minimum supported version // switch to `CLUSTER SHARDS` when Redis 7.0 will be the minimum supported version
@@ -273,7 +261,7 @@ export default class RedisClusterSlots<
} }
} }
private _clientOptionsDefaults(options?: RedisClientOptions): RedisClientOptions | undefined { private _clientOptionsDefaults(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
if (!this._options.defaults) return options; if (!this._options.defaults) return options;
let socket; let socket;
@@ -301,7 +289,6 @@ export default class RedisClusterSlots<
promises: Array<Promise<unknown>> promises: Array<Promise<unknown>>
) { ) {
const address = `${shard.host}:${shard.port}`; const address = `${shard.host}:${shard.port}`;
addressesInUse.add(address);
let node = this.nodeByAddress.get(address); let node = this.nodeByAddress.get(address);
if (!node) { if (!node) {
@@ -309,7 +296,8 @@ export default class RedisClusterSlots<
...shard, ...shard,
address, address,
readonly, readonly,
client: undefined client: undefined,
connectPromise: undefined
}; };
if (eagerConnent) { if (eagerConnent) {
@@ -319,16 +307,16 @@ export default class RedisClusterSlots<
this.nodeByAddress.set(address, node); this.nodeByAddress.set(address, node);
} }
(readonly ? this.replicas : this.masters).push(node); if (!addressesInUse.has(address)) {
addressesInUse.add(address);
(readonly ? this.replicas : this.masters).push(node);
}
return node; return node;
} }
private async _createClient( private _createClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly = node.readonly) {
node: ShardNode<M, F, S, RESP>, return this._clientFactory(
readonly = node.readonly
) {
const client = this._clientFactory(
this._clientOptionsDefaults({ this._clientOptionsDefaults({
socket: this._getNodeAddress(node.address) ?? { socket: this._getNodeAddress(node.address) ?? {
host: node.host, host: node.host,
@@ -337,38 +325,29 @@ export default class RedisClusterSlots<
readonly, readonly,
RESP: this._options.RESP RESP: this._options.RESP
}) })
).on('error', err => console.error(err));
}
private _createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {
const client = node.client = this._createClient(node, readonly);
return node.connectPromise = client.connect()
.finally(() => node.connectPromise = undefined);
}
nodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>) {
return (
node.connectPromise ?? // if the node is connecting
node.client ?? // if the node is connected
this._createNodeClient(node) // if the not is disconnected
); );
client.on('error', err => this._emit('error', err));
await client.connect();
return client;
} }
private _createNodeClient(node: ShardNode<M, F, S, RESP>) { private _runningRediscoverPromise?: Promise<void>;
const promise = this._createClient(node)
.then(client => {
node.client = client;
return client;
})
.catch(err => {
node.client = undefined;
throw err;
});
node.client = promise;
return promise;
}
nodeClient(node: ShardNode<M, F, S, RESP>) {
return node.client ?? this._createNodeClient(node);
}
#runningRediscoverPromise?: Promise<void>;
async rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> { async rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> {
this.#runningRediscoverPromise ??= this._rediscover(startWith) this._runningRediscoverPromise ??= this._rediscover(startWith)
.finally(() => this.#runningRediscoverPromise = undefined); .finally(() => this._runningRediscoverPromise = undefined);
return this.#runningRediscoverPromise; return this._runningRediscoverPromise;
} }
private async _rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> { private async _rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> {
@@ -399,11 +378,11 @@ export default class RedisClusterSlots<
this._isOpen = false; this._isOpen = false;
for (const client of this._clients()) { for (const client of this._clients()) {
this._execOnNodeClient(client, client => client.destroy()); client.destroy();
} }
if (this.pubSubNode) { if (this.pubSubNode) {
this._execOnNodeClient(this.pubSubNode.client, client => client.destroy()); this.pubSubNode.client.destroy();
this.pubSubNode = undefined; this.pubSubNode = undefined;
} }
@@ -412,21 +391,19 @@ export default class RedisClusterSlots<
} }
private *_clients() { private *_clients() {
for (const { master, replicas } of this.shards) { for (const master of this.masters) {
if (master.client) { if (master.client) {
yield master.client; yield master.client;
} }
if (master.pubSubClient) { if (master.pubSub) {
yield master.pubSubClient; yield master.pubSub.client;
} }
}
if (replicas) { for (const replica of this.replicas) {
for (const { client } of replicas) { if (replica.client) {
if (client) { yield replica.client;
yield client;
}
}
} }
} }
} }
@@ -436,11 +413,11 @@ export default class RedisClusterSlots<
const promises = []; const promises = [];
for (const client of this._clients()) { for (const client of this._clients()) {
promises.push(this._execOnNodeClient(client, fn)); promises.push(fn(client));
} }
if (this.pubSubNode) { if (this.pubSubNode) {
promises.push(this._execOnNodeClient(this.pubSubNode.client, fn)); promises.push(fn(this.pubSubNode.client));
this.pubSubNode = undefined; this.pubSubNode = undefined;
} }
@@ -450,19 +427,10 @@ export default class RedisClusterSlots<
await Promise.allSettled(promises); await Promise.allSettled(promises);
} }
private _execOnNodeClient<T>(
client: ClientOrPromise<M, F, S, RESP>,
fn: (client: RedisClientType<M, F, S, RESP>) => T
): T | Promise<T> {
return types.isPromise(client) ?
client.then(fn) :
fn(client);
}
getClient( getClient(
firstKey: RedisArgument | undefined, firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined isReadonly: boolean | undefined
): ClientOrPromise<M, F, S, RESP> { ) {
if (!firstKey) { if (!firstKey) {
return this.nodeClient(this.getRandomNode()); return this.nodeClient(this.getRandomNode());
} }
@@ -503,14 +471,14 @@ export default class RedisClusterSlots<
} }
} }
_randomNodeIterator?: IterableIterator<ShardNode<M, F, S, RESP>>; _randomNodeIterator?: IterableIterator<ShardNode<M, F, S, RESP, TYPE_MAPPING>>;
getRandomNode() { getRandomNode() {
this._randomNodeIterator ??= this._iterateAllNodes(); this._randomNodeIterator ??= this._iterateAllNodes();
return this._randomNodeIterator.next().value as ShardNode<M, F, S, RESP>; return this._randomNodeIterator.next().value as ShardNode<M, F, S, RESP, TYPE_MAPPING>;
} }
private *_slotNodesIterator(slot: ShardWithReplicas<M, F, S, RESP>) { private *_slotNodesIterator(slot: ShardWithReplicas<M, F, S, RESP, TYPE_MAPPING>) {
let i = Math.floor(Math.random() * (1 + slot.replicas.length)); let i = Math.floor(Math.random() * (1 + slot.replicas.length));
if (i < slot.replicas.length) { if (i < slot.replicas.length) {
do { do {
@@ -533,8 +501,8 @@ export default class RedisClusterSlots<
return slot.master; return slot.master;
} }
slot.nodesIterator ??= this._slotNodesIterator(slot as ShardWithReplicas<M, F, S, RESP>); slot.nodesIterator ??= this._slotNodesIterator(slot as ShardWithReplicas<M, F, S, RESP, TYPE_MAPPING>);
return slot.nodesIterator.next().value as ShardNode<M, F, S, RESP>; return slot.nodesIterator.next().value as ShardNode<M, F, S, RESP, TYPE_MAPPING>;
} }
getMasterByAddress(address: string) { getMasterByAddress(address: string) {
@@ -545,20 +513,22 @@ export default class RedisClusterSlots<
} }
getPubSubClient() { getPubSubClient() {
return this.pubSubNode ? if (!this.pubSubNode) return this._initiatePubSubClient();
this.pubSubNode.client :
this._initiatePubSubClient(); return this.pubSubNode.connectPromise ?? this.pubSubNode.client;
} }
private async _initiatePubSubClient(toResubscribe?: PubSubToResubscribe) { private async _initiatePubSubClient(toResubscribe?: PubSubToResubscribe) {
const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)), const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)),
node = index < this.masters.length ? node = index < this.masters.length ?
this.masters[index] : this.masters[index] :
this.replicas[index - this.masters.length]; this.replicas[index - this.masters.length],
client = this._createClient(node, true);
this.pubSubNode = { this.pubSubNode = {
address: node.address, address: node.address,
client: this._createClient(node, true) client,
connectPromise: client.connect()
.then(async client => { .then(async client => {
if (toResubscribe) { if (toResubscribe) {
await Promise.all([ await Promise.all([
@@ -567,7 +537,7 @@ export default class RedisClusterSlots<
]); ]);
} }
this.pubSubNode!.client = client; this.pubSubNode!.connectPromise = undefined;
return client; return client;
}) })
.catch(err => { .catch(err => {
@@ -576,7 +546,7 @@ export default class RedisClusterSlots<
}) })
}; };
return this.pubSubNode.client as Promise<RedisClientType<M, F, S, RESP>>; return this.pubSubNode.connectPromise!;
} }
async executeUnsubscribeCommand( async executeUnsubscribeCommand(
@@ -593,52 +563,58 @@ export default class RedisClusterSlots<
getShardedPubSubClient(channel: string) { getShardedPubSubClient(channel: string) {
const { master } = this.slots[calculateSlot(channel)]; const { master } = this.slots[calculateSlot(channel)];
return master.pubSubClient ?? this.#initiateShardedPubSubClient(master); if (!master.pubSub) return this._initiateShardedPubSubClient(master);
return master.pubSub.connectPromise ?? master.pubSub.client;
} }
#initiateShardedPubSubClient(master: MasterNode<M, F, S, RESP>) { private async _initiateShardedPubSubClient(master: MasterNode<M, F, S, RESP, TYPE_MAPPING>) {
const promise = this._createClient(master, true) const client = this._createClient(master, true)
.then(client => { .on('server-sunsubscribe', async (channel, listeners) => {
client.on('server-sunsubscribe', async (channel, listeners) => { try {
try { await this.rediscover(client);
await this.rediscover(client); const redirectTo = await this.getShardedPubSubClient(channel);
const redirectTo = await this.getShardedPubSubClient(channel); await redirectTo.extendPubSubChannelListeners(
redirectTo.extendPubSubChannelListeners( PubSubType.SHARDED,
PubSubType.SHARDED, channel,
channel, listeners
listeners );
); } catch (err) {
} catch (err) { this._emit('sharded-shannel-moved-error', err, channel, listeners);
this._emit('sharded-shannel-moved-error', err, channel, listeners); }
}
});
master.pubSubClient = client;
return client;
})
.catch(err => {
master.pubSubClient = undefined;
throw err;
}); });
master.pubSubClient = promise; master.pubSub = {
client,
connectPromise: client.connect()
.then(client => {
master.pubSub!.connectPromise = undefined;
return client;
})
.catch(err => {
master.pubSub = undefined;
throw err;
})
};
return promise; return master.pubSub.connectPromise!;
} }
async executeShardedUnsubscribeCommand( async executeShardedUnsubscribeCommand(
channel: string, channel: string,
unsubscribe: (client: RedisClientType<M, F, S, RESP>) => Promise<void> unsubscribe: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>) => Promise<void>
): Promise<void> { ) {
const { master } = this.slots[calculateSlot(channel)]; const { master } = this.slots[calculateSlot(channel)];
if (!master.pubSubClient) return Promise.resolve(); if (!master.pubSub) return;
const client = master.pubSub.connectPromise ?
await master.pubSub.connectPromise :
master.pubSub.client;
const client = await master.pubSubClient;
await unsubscribe(client); await unsubscribe(client);
if (!client.isPubSubActive) { if (!client.isPubSubActive) {
await client.disconnect(); client.destroy();
master.pubSubClient = undefined; master.pubSub = undefined;
} }
} }
} }

View File

@@ -1,11 +1,9 @@
import { strict as assert } from 'node:assert'; import { strict as assert } from 'node:assert';
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
import RedisCluster from '.'; import RedisCluster from '.';
// import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT';
import { SQUARE_SCRIPT } from '../client/index.spec'; import { SQUARE_SCRIPT } from '../client/index.spec';
import { RootNodesUnavailableError } from '../errors'; import { RootNodesUnavailableError } from '../errors';
import { spy } from 'sinon'; import { spy } from 'sinon';
// import { setTimeout } from 'node:timers/promises';
import RedisClient from '../client'; import RedisClient from '../client';
describe('Cluster', () => { describe('Cluster', () => {
@@ -69,58 +67,58 @@ describe('Cluster', () => {
} }
}); });
// testUtils.testWithCluster('should handle live resharding', async cluster => { testUtils.testWithCluster('should handle live resharding', async cluster => {
// const slot = 12539, const slot = 12539,
// key = 'key', key = 'key',
// value = 'value'; value = 'value';
// await cluster.set(key, value); await cluster.set(key, value);
// const importing = cluster.slots[0].master, const importing = cluster.slots[0].master,
// migrating = cluster.slots[slot].master, migrating = cluster.slots[slot].master,
// [importingClient, migratingClient] = await Promise.all([ [importingClient, migratingClient] = await Promise.all([
// cluster.nodeClient(importing), cluster.nodeClient(importing),
// cluster.nodeClient(migrating) cluster.nodeClient(migrating)
// ]); ]);
// await Promise.all([ await Promise.all([
// importingClient.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, migrating.id), importingClient.clusterSetSlot(slot, 'IMPORTING', migrating.id),
// migratingClient.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, importing.id) migratingClient.clusterSetSlot(slot, 'MIGRATING', importing.id)
// ]); ]);
// // should be able to get the key from the migrating node // should be able to get the key from the migrating node
// assert.equal( assert.equal(
// await cluster.get(key), await cluster.get(key),
// value value
// ); );
// await migratingClient.migrate( await migratingClient.migrate(
// importing.host, importing.host,
// importing.port, importing.port,
// key, key,
// 0, 0,
// 10 10
// ); );
// // should be able to get the key from the importing node using `ASKING` // should be able to get the key from the importing node using `ASKING`
// assert.equal( assert.equal(
// await cluster.get(key), await cluster.get(key),
// value value
// ); );
// await Promise.all([ await Promise.all([
// importingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id), importingClient.clusterSetSlot(slot, 'NODE', importing.id),
// migratingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id), migratingClient.clusterSetSlot(slot, 'NODE', importing.id),
// ]); ]);
// // should handle `MOVED` errors // should handle `MOVED` errors
// assert.equal( assert.equal(
// await cluster.get(key), await cluster.get(key),
// value value
// ); );
// }, { }, {
// serverArguments: [], serverArguments: [],
// numberOfMasters: 2 numberOfMasters: 2
// }); });
testUtils.testWithCluster('getRandomNode should spread the the load evenly', async cluster => { testUtils.testWithCluster('getRandomNode should spread the the load evenly', async cluster => {
const totalNodes = cluster.masters.length + cluster.replicas.length, const totalNodes = cluster.masters.length + cluster.replicas.length,
@@ -145,7 +143,6 @@ describe('Cluster', () => {
testUtils.testWithCluster('cluster topology', async cluster => { testUtils.testWithCluster('cluster topology', async cluster => {
assert.equal(cluster.slots.length, 16384); assert.equal(cluster.slots.length, 16384);
const { numberOfMasters, numberOfReplicas } = GLOBAL.CLUSTERS.WITH_REPLICAS; const { numberOfMasters, numberOfReplicas } = GLOBAL.CLUSTERS.WITH_REPLICAS;
assert.equal(cluster.shards.length, numberOfMasters);
assert.equal(cluster.masters.length, numberOfMasters); assert.equal(cluster.masters.length, numberOfMasters);
assert.equal(cluster.replicas.length, numberOfReplicas * numberOfMasters); assert.equal(cluster.replicas.length, numberOfReplicas * numberOfMasters);
assert.equal(cluster.nodeByAddress.size, numberOfMasters + numberOfMasters * numberOfReplicas); assert.equal(cluster.nodeByAddress.size, numberOfMasters + numberOfMasters * numberOfReplicas);
@@ -239,54 +236,53 @@ describe('Cluster', () => {
assert.equal(cluster.pubSubNode, undefined); assert.equal(cluster.pubSubNode, undefined);
}, GLOBAL.CLUSTERS.OPEN); }, GLOBAL.CLUSTERS.OPEN);
// testUtils.testWithCluster('should move listeners when PubSub node disconnects from the cluster', async cluster => { testUtils.testWithCluster('should move listeners when PubSub node disconnects from the cluster', async cluster => {
// const listener = spy(); const listener = spy();
// await cluster.subscribe('channel', listener); await cluster.subscribe('channel', listener);
// assert.ok(cluster.pubSubNode); assert.ok(cluster.pubSubNode);
// const [migrating, importing] = cluster.masters[0].address === cluster.pubSubNode.address ? const [migrating, importing] = cluster.masters[0].address === cluster.pubSubNode.address ?
// cluster.masters : cluster.masters :
// [cluster.masters[1], cluster.masters[0]], [cluster.masters[1], cluster.masters[0]],
// [migratingClient, importingClient] = await Promise.all([ [migratingClient, importingClient] = await Promise.all([
// cluster.nodeClient(migrating), cluster.nodeClient(migrating),
// cluster.nodeClient(importing) cluster.nodeClient(importing)
// ]); ]);
// const range = cluster.slots[0].master === migrating ? { const range = cluster.slots[0].master === migrating ? {
// key: 'bar', // 5061 key: 'bar', // 5061
// start: 0, start: 0,
// end: 8191 end: 8191
// } : { } : {
// key: 'foo', // 12182 key: 'foo', // 12182
// start: 8192, start: 8192,
// end: 16383 end: 16383
// }; };
// await Promise.all([ // TODO: is there a better way to migrate slots without causing CLUSTERDOWN?
// migratingClient.clusterDelSlotsRange(range), const promises: Array<Promise<unknown>> = [];
// importingClient.clusterDelSlotsRange(range), for (let i = range.start; i <= range.end; i++) {
// importingClient.clusterAddSlotsRange(range) promises.push(
// ]); migratingClient.clusterSetSlot(i, 'NODE', importing.id),
importingClient.clusterSetSlot(i, 'NODE', importing.id)
);
}
await Promise.all(promises);
// // wait for migrating node to be notified about the new topology // make sure to cause `MOVED` error
// while ((await migratingClient.clusterInfo()).state !== 'ok') { await cluster.get(range.key);
// await setTimeout(50);
// }
// // make sure to cause `MOVED` error await Promise.all([
// await cluster.get(range.key); cluster.publish('channel', 'message'),
waitTillBeenCalled(listener)
]);
// await Promise.all([ assert.ok(listener.calledOnceWithExactly('message', 'channel'));
// cluster.publish('channel', 'message'), }, {
// waitTillBeenCalled(listener) serverArguments: [],
// ]); numberOfMasters: 2,
minimumDockerVersion: [7]
// assert.ok(listener.calledOnceWithExactly('message', 'channel')); });
// }, {
// serverArguments: [],
// numberOfMasters: 2,
// minimumDockerVersion: [7]
// });
testUtils.testWithCluster('ssubscribe & sunsubscribe', async cluster => { testUtils.testWithCluster('ssubscribe & sunsubscribe', async cluster => {
const listener = spy(); const listener = spy();
@@ -303,46 +299,44 @@ describe('Cluster', () => {
await cluster.sUnsubscribe('channel', listener); await cluster.sUnsubscribe('channel', listener);
// 10328 is the slot of `channel` // 10328 is the slot of `channel`
assert.equal(cluster.slots[10328].master.pubSubClient, undefined); assert.equal(cluster.slots[10328].master.pubSub, undefined);
}, { }, {
...GLOBAL.CLUSTERS.OPEN, ...GLOBAL.CLUSTERS.OPEN,
minimumDockerVersion: [7] minimumDockerVersion: [7]
}); });
// testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => { testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => {
// const SLOT = 10328, const SLOT = 10328,
// migrating = cluster.slots[SLOT].master, migrating = cluster.slots[SLOT].master,
// importing = cluster.masters.find(master => master !== migrating)!, importing = cluster.masters.find(master => master !== migrating)!,
// [migratingClient, importingClient] = await Promise.all([ [migratingClient, importingClient] = await Promise.all([
// cluster.nodeClient(migrating), cluster.nodeClient(migrating),
// cluster.nodeClient(importing) cluster.nodeClient(importing)
// ]); ]);
// await Promise.all([ await Promise.all([
// migratingClient.clusterDelSlots(SLOT), migratingClient.clusterDelSlots(SLOT),
// importingClient.clusterDelSlots(SLOT), importingClient.clusterDelSlots(SLOT),
// importingClient.clusterAddSlots(SLOT) importingClient.clusterAddSlots(SLOT),
// ]); // cause "topology refresh" on both nodes
migratingClient.clusterSetSlot(SLOT, 'NODE', importing.id),
importingClient.clusterSetSlot(SLOT, 'NODE', importing.id)
]);
// // wait for migrating node to be notified about the new topology const listener = spy();
// while ((await migratingClient.clusterInfo()).state !== 'ok') {
// await setTimeout(50);
// }
// const listener = spy(); // will trigger `MOVED` error
await cluster.sSubscribe('channel', listener);
// // will trigger `MOVED` error await Promise.all([
// await cluster.sSubscribe('channel', listener); waitTillBeenCalled(listener),
cluster.sPublish('channel', 'message')
]);
// await Promise.all([ assert.ok(listener.calledOnceWithExactly('message', 'channel'));
// waitTillBeenCalled(listener), }, {
// cluster.sPublish('channel', 'message') serverArguments: [],
// ]); minimumDockerVersion: [7]
});
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
// }, {
// serverArguments: [],
// minimumDockerVersion: [7]
// });
}); });
}); });

View File

@@ -17,7 +17,7 @@ interface ClusterCommander<
RESP extends RespVersions, RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping, TYPE_MAPPING extends TypeMapping,
// POLICIES extends CommandPolicies // POLICIES extends CommandPolicies
> extends CommanderConfig<M, F, S, RESP>{ > extends CommanderConfig<M, F, S, RESP> {
commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>; commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>;
} }
@@ -303,7 +303,7 @@ export default class RedisCluster<
private readonly _options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>; private readonly _options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>;
private readonly _slots: RedisClusterSlots<M, F, S, RESP>; private readonly _slots: RedisClusterSlots<M, F, S, RESP, TYPE_MAPPING>;
private _commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>; private _commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>;
@@ -315,14 +315,6 @@ export default class RedisCluster<
return this._slots.slots; return this._slots.slots;
} }
/**
* An array of cluster shards, each shard contain its `master` and `replicas`.
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica).
*/
get shards() {
return this._slots.shards;
}
/** /**
* An array of the cluster masters. * An array of the cluster masters.
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node.
@@ -442,7 +434,7 @@ export default class RedisCluster<
private async _execute<T>( private async _execute<T>(
firstKey: RedisArgument | undefined, firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined, isReadonly: boolean | undefined,
fn: (client: RedisClientType<M, F, S, RESP>) => Promise<T> fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>) => Promise<T>
): Promise<T> { ): Promise<T> {
const maxCommandRedirections = this._options.maxCommandRedirections ?? 16; const maxCommandRedirections = this._options.maxCommandRedirections ?? 16;
let client = await this._slots.getClient(firstKey, isReadonly), let client = await this._slots.getClient(firstKey, isReadonly),
@@ -655,7 +647,7 @@ export default class RedisCluster<
return this._slots.destroy(); return this._slots.destroy();
} }
nodeClient(node: ShardNode<M, F, S, RESP>) { nodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>) {
return this._slots.nodeClient(node); return this._slots.nodeClient(node);
} }

View File

@@ -1,4 +1,4 @@
import { SimpleStringReply, Command } from '@redis/client/dist/lib/RESP/types'; import { SimpleStringReply, Command } from '../RESP/types';
export default { export default {
FIRST_KEY_INDEX: undefined, FIRST_KEY_INDEX: undefined,

View File

@@ -1,4 +1,4 @@
import { BlobStringReply, Command } from '@redis/client/dist/lib/RESP/types'; import { BlobStringReply, Command } from '../RESP/types';
export default { export default {
FIRST_KEY_INDEX: undefined, FIRST_KEY_INDEX: undefined,

View File

@@ -11,10 +11,11 @@ describe('CLUSTER REPLICAS', () => {
}); });
testUtils.testWithCluster('clusterNode.clusterReplicas', async cluster => { testUtils.testWithCluster('clusterNode.clusterReplicas', async cluster => {
const client = await cluster.nodeClient(cluster.masters[0]); const client = await cluster.nodeClient(cluster.masters[0]),
assert.equal( reply = await client.clusterReplicas(cluster.masters[0].id);
typeof await client.clusterReplicas(cluster.masters[0].id), assert.ok(Array.isArray(reply));
'string' for (const replica of reply) {
); assert.equal(typeof replica, 'string');
}
}, GLOBAL.CLUSTERS.OPEN); }, GLOBAL.CLUSTERS.OPEN);
}); });

View File

@@ -1,4 +1,4 @@
import { RedisArgument, VerbatimStringReply, Command } from '../RESP/types'; import { RedisArgument, ArrayReply, BlobStringReply, Command } from '../RESP/types';
export default { export default {
FIRST_KEY_INDEX: undefined, FIRST_KEY_INDEX: undefined,
@@ -6,5 +6,5 @@ export default {
transformArguments(nodeId: RedisArgument) { transformArguments(nodeId: RedisArgument) {
return ['CLUSTER', 'REPLICAS', nodeId]; return ['CLUSTER', 'REPLICAS', nodeId];
}, },
transformReply: undefined as unknown as () => VerbatimStringReply transformReply: undefined as unknown as () => ArrayReply<BlobStringReply>
} as const satisfies Command; } as const satisfies Command;

View File

@@ -21,7 +21,7 @@ describe('FCALL', () => {
loadMathFunction(client), loadMathFunction(client),
client.set('key', '2'), client.set('key', '2'),
client.fCall(MATH_FUNCTION.library.square.NAME, { client.fCall(MATH_FUNCTION.library.square.NAME, {
arguments: ['key'] keys: ['key']
}) })
]); ]);

View File

@@ -21,7 +21,7 @@ describe('FCALL_RO', () => {
loadMathFunction(client), loadMathFunction(client),
client.set('key', '2'), client.set('key', '2'),
client.fCallRo(MATH_FUNCTION.library.square.NAME, { client.fCallRo(MATH_FUNCTION.library.square.NAME, {
arguments: ['key'] keys: ['key']
}) })
]); ]);

View File

@@ -4,6 +4,8 @@ import FUNCTION_LOAD from './FUNCTION_LOAD';
import { RedisClientType } from '../client'; import { RedisClientType } from '../client';
import { NumberReply, RedisFunctions, RedisModules, RedisScripts, RespVersions } from '../RESP/types'; import { NumberReply, RedisFunctions, RedisModules, RedisScripts, RespVersions } from '../RESP/types';
export const MATH_FUNCTION = { export const MATH_FUNCTION = {
name: 'math', name: 'math',
engine: 'LUA', engine: 'LUA',
@@ -11,10 +13,10 @@ export const MATH_FUNCTION = {
`#!LUA name=math `#!LUA name=math
redis.register_function { redis.register_function {
function_name = "square", function_name = "square",
callback = function(keys, args) { callback = function(keys, args)
local number = redis.call('GET', keys[1]) local number = redis.call('GET', keys[1])
return number * number return number * number
}, end,
flags = { "no-writes" } flags = { "no-writes" }
}`, }`,
library: { library: {

View File

@@ -1,76 +1,76 @@
import { strict as assert } from 'node:assert'; import { strict as assert } from 'node:assert';
import { transformArguments } from './MIGRATE'; import MIGRATE from './MIGRATE';
describe('MIGRATE', () => { describe('MIGRATE', () => {
describe('transformArguments', () => { describe('transformArguments', () => {
it('single key', () => { it('single key', () => {
assert.deepEqual( assert.deepEqual(
transformArguments('127.0.0.1', 6379, 'key', 0, 10), MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10'] ['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10']
); );
});
it('multiple keys', () => {
assert.deepEqual(
transformArguments('127.0.0.1', 6379, ['1', '2'], 0, 10),
['MIGRATE', '127.0.0.1', '6379', '', '0', '10', 'KEYS', '1', '2']
);
});
it('with COPY', () => {
assert.deepEqual(
transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
COPY: true
}),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'COPY']
);
});
it('with REPLACE', () => {
assert.deepEqual(
transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
REPLACE: true
}),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'REPLACE']
);
});
describe('with AUTH', () => {
it('password only', () => {
assert.deepEqual(
transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
AUTH: {
password: 'password'
}
}),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'AUTH', 'password']
);
});
it('username & password', () => {
assert.deepEqual(
transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
AUTH: {
username: 'username',
password: 'password'
}
}),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'AUTH2', 'username', 'password']
);
});
});
it('with COPY, REPLACE, AUTH', () => {
assert.deepEqual(
transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
COPY: true,
REPLACE: true,
AUTH: {
password: 'password'
}
}),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'COPY', 'REPLACE', 'AUTH', 'password']
);
});
}); });
it('multiple keys', () => {
assert.deepEqual(
MIGRATE.transformArguments('127.0.0.1', 6379, ['1', '2'], 0, 10),
['MIGRATE', '127.0.0.1', '6379', '', '0', '10', 'KEYS', '1', '2']
);
});
it('with COPY', () => {
assert.deepEqual(
MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
COPY: true
}),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'COPY']
);
});
it('with REPLACE', () => {
assert.deepEqual(
MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
REPLACE: true
}),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'REPLACE']
);
});
describe('with AUTH', () => {
it('password only', () => {
assert.deepEqual(
MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
AUTH: {
password: 'password'
}
}),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'AUTH', 'password']
);
});
it('username & password', () => {
assert.deepEqual(
MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
AUTH: {
username: 'username',
password: 'password'
}
}),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'AUTH2', 'username', 'password']
);
});
});
it('with COPY, REPLACE, AUTH', () => {
assert.deepEqual(
MIGRATE.transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
COPY: true,
REPLACE: true,
AUTH: {
password: 'password'
}
}),
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'COPY', 'REPLACE', 'AUTH', 'password']
);
});
});
}); });

View File

@@ -1,13 +1,14 @@
import { RedisArgument, SimpleStringReply, Command } from '../RESP/types'; import { RedisArgument, SimpleStringReply, Command } from '../RESP/types';
import { AuthOptions } from './AUTH'; import { AuthOptions } from './AUTH';
interface MigrateOptions { export interface MigrateOptions {
COPY?: true; COPY?: true;
REPLACE?: true; REPLACE?: true;
AUTH?: AuthOptions; AUTH?: AuthOptions;
} }
export default { export default {
IS_READ_ONLY: false,
transformArguments( transformArguments(
host: RedisArgument, host: RedisArgument,
port: number, port: number,
@@ -62,5 +63,5 @@ export default {
return args; return args;
}, },
transformReply: undefined as unknown as () => SimpleStringReply transformReply: undefined as unknown as () => SimpleStringReply<'OK'>
} as const satisfies Command; } as const satisfies Command;

View File

@@ -1,7 +1,7 @@
import { RedisArgument, SimpleStringReply, Command } from '../RESP/types'; import { RedisArgument, SimpleStringReply, Command } from '../RESP/types';
export default { export default {
FIRST_KEY_INDEX: undefined, FIRST_KEY_INDEX: 1,
IS_READ_ONLY: true, IS_READ_ONLY: true,
transformArguments(key: RedisArgument, newKey: RedisArgument) { transformArguments(key: RedisArgument, newKey: RedisArgument) {
return ['RENAME', key, newKey]; return ['RENAME', key, newKey];

View File

@@ -1,7 +1,7 @@
import { RedisArgument, NumberReply, Command } from '../RESP/types'; import { RedisArgument, NumberReply, Command } from '../RESP/types';
export default { export default {
FIRST_KEY_INDEX: undefined, FIRST_KEY_INDEX: 1,
IS_READ_ONLY: true, IS_READ_ONLY: true,
transformArguments(key: RedisArgument, newKey: RedisArgument) { transformArguments(key: RedisArgument, newKey: RedisArgument) {
return ['RENAMENX', key, newKey]; return ['RENAMENX', key, newKey];

View File

@@ -2,7 +2,7 @@ import { RedisArgument, Command } from '../RESP/types';
import { transformSortedSetReply } from './generic-transformers'; import { transformSortedSetReply } from './generic-transformers';
export default { export default {
FIRST_KEY_INDEX: undefined, FIRST_KEY_INDEX: 1,
IS_READ_ONLY: false, IS_READ_ONLY: false,
transformArguments(key: RedisArgument, count: number) { transformArguments(key: RedisArgument, count: number) {
return ['ZPOPMAX', key, count.toString()]; return ['ZPOPMAX', key, count.toString()];

View File

@@ -184,6 +184,7 @@ import MEMORY_PURGE from './MEMORY_PURGE';
import MEMORY_STATS from './MEMORY_STATS'; import MEMORY_STATS from './MEMORY_STATS';
import MEMORY_USAGE from './MEMORY_USAGE'; import MEMORY_USAGE from './MEMORY_USAGE';
import MGET from './MGET'; import MGET from './MGET';
import MIGRATE from './MIGRATE';
import MODULE_LIST from './MODULE_LIST'; import MODULE_LIST from './MODULE_LIST';
import MODULE_LOAD from './MODULE_LOAD'; import MODULE_LOAD from './MODULE_LOAD';
import MODULE_UNLOAD from './MODULE_UNLOAD'; import MODULE_UNLOAD from './MODULE_UNLOAD';
@@ -703,6 +704,8 @@ export default {
memoryUsage: MEMORY_USAGE, memoryUsage: MEMORY_USAGE,
MGET, MGET,
mGet: MGET, mGet: MGET,
MIGRATE,
migrate: MIGRATE,
MODULE_LIST, MODULE_LIST,
moduleList: MODULE_LIST, moduleList: MODULE_LIST,
MODULE_LOAD, MODULE_LOAD,

View File

@@ -20,7 +20,7 @@ describe('Multi Command', () => {
multi.addScript(SQUARE_SCRIPT, ['1']); multi.addScript(SQUARE_SCRIPT, ['1']);
assert.deepEqual( assert.deepEqual(
Array.from(multi.queue.at(-1).args), Array.from(multi.queue.at(-1).args),
['EVAL', SQUARE_SCRIPT.SCRIPT, '0', '1'] ['EVAL', SQUARE_SCRIPT.SCRIPT, '1', '1']
); );
}); });
@@ -28,7 +28,7 @@ describe('Multi Command', () => {
multi.addScript(SQUARE_SCRIPT, ['2']); multi.addScript(SQUARE_SCRIPT, ['2']);
assert.deepEqual( assert.deepEqual(
Array.from(multi.queue.at(-1).args), Array.from(multi.queue.at(-1).args),
['EVALSHA', SQUARE_SCRIPT.SHA1, '0', '2'] ['EVALSHA', SQUARE_SCRIPT.SHA1, '1', '2']
); );
}); });

View File

@@ -200,9 +200,9 @@ export default class TestUtils {
// POLICIES extends CommandPolicies // POLICIES extends CommandPolicies
>(cluster: RedisClusterType<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>): Promise<unknown> { >(cluster: RedisClusterType<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>): Promise<unknown> {
return Promise.all( return Promise.all(
cluster.masters.map(async ({ client }) => { cluster.masters.map(async master => {
if (client) { if (master.client) {
await (await client).flushAll(); (await cluster.nodeClient(master)).flushAll();
} }
}) })
); );
@@ -256,7 +256,7 @@ export default class TestUtils {
await fn(cluster); await fn(cluster);
} finally { } finally {
await TestUtils.#clusterFlushAll(cluster); await TestUtils.#clusterFlushAll(cluster);
await cluster.disconnect(); cluster.destroy();
} }
}); });
} }