You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-09-11 18:50:46 +03:00
handle ASK errors, add some commands and tests
This commit is contained in:
@@ -6,16 +6,21 @@ import { RedisClusterOptions } from './cluster';
|
||||
import { RedisModules } from './commands';
|
||||
import { RedisLuaScripts } from './lua-script';
|
||||
|
||||
interface SlotClients<M extends RedisModules, S extends RedisLuaScripts> {
|
||||
master: RedisClientType<M, S>;
|
||||
replicas: Array<RedisClientType<M, S>>;
|
||||
iterator: IterableIterator<RedisClientType<M, S>> | undefined;
|
||||
export interface ClusterNode<M extends RedisModules, S extends RedisLuaScripts> {
|
||||
id: string;
|
||||
client: RedisClientType<M, S>;
|
||||
}
|
||||
|
||||
interface SlotNodes<M extends RedisModules, S extends RedisLuaScripts> {
|
||||
master: ClusterNode<M, S>;
|
||||
replicas: Array<ClusterNode<M, S>>;
|
||||
clientIterator: IterableIterator<RedisClientType<M, S>> | undefined;
|
||||
}
|
||||
|
||||
export default class RedisClusterSlots<M extends RedisModules, S extends RedisLuaScripts> {
|
||||
readonly #options: RedisClusterOptions;
|
||||
readonly #clientByKey = new Map<string, RedisClientType<M, S>>();
|
||||
readonly #slots: Array<SlotClients<M, S>> = [];
|
||||
readonly #nodeByUrl = new Map<string, ClusterNode<M, S>>();
|
||||
readonly #slots: Array<SlotNodes<M, S>> = [];
|
||||
|
||||
constructor(options: RedisClusterOptions) {
|
||||
this.#options = options;
|
||||
@@ -27,6 +32,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
|
||||
await this.#discoverNodes(rootNode);
|
||||
return;
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
// this.emit('error', err);
|
||||
}
|
||||
}
|
||||
@@ -39,16 +45,18 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
|
||||
await this.#discoverNodes(startWith.options?.socket);
|
||||
return;
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
// this.emit('error', err);
|
||||
}
|
||||
|
||||
for (const client of this.#clientByKey.values()) {
|
||||
for (const { client } of this.#nodeByUrl.values()) {
|
||||
if (client === startWith) continue;
|
||||
|
||||
try {
|
||||
await this.#discoverNodes(client.options?.socket);
|
||||
return;
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
// this.emit('error', err);
|
||||
}
|
||||
}
|
||||
@@ -80,7 +88,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
|
||||
replicas: this.#options.useReplicas ?
|
||||
master.replicas.map(replica => this.#initiateClientForNode(replica, true, clientsInUse, promises)) :
|
||||
[],
|
||||
iterator: undefined // will be initiated in use
|
||||
clientIterator: undefined // will be initiated in use
|
||||
};
|
||||
|
||||
for (const { from, to } of master.slots) {
|
||||
@@ -91,73 +99,77 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
|
||||
}
|
||||
|
||||
// Remove unused clients from this.#clientBykey using clientsInUse
|
||||
for (const [key, client] of this.#clientByKey.entries()) {
|
||||
if (clientsInUse.has(key)) continue;
|
||||
for (const [url, { client }] of this.#nodeByUrl.entries()) {
|
||||
if (clientsInUse.has(url)) continue;
|
||||
|
||||
// TODO: ignore error from `.disconnect`?
|
||||
promises.push(client.disconnect());
|
||||
this.#clientByKey.delete(key);
|
||||
this.#nodeByUrl.delete(url);
|
||||
}
|
||||
|
||||
await Promise.all(promises);
|
||||
}
|
||||
|
||||
#initiateClientForNode(node: RedisClusterMasterNode | RedisClusterReplicaNode, readonly: boolean, clientsInUse: Set<string>, promises: Array<Promise<void>>): RedisClientType<M, S> {
|
||||
clientsInUse.add(node.url);
|
||||
#initiateClientForNode(nodeData: RedisClusterMasterNode | RedisClusterReplicaNode, readonly: boolean, clientsInUse: Set<string>, promises: Array<Promise<void>>): ClusterNode<M, S> {
|
||||
const url = `${nodeData.host}:${nodeData.port}`;
|
||||
clientsInUse.add(url);
|
||||
|
||||
let client = this.#clientByKey.get(node.url);
|
||||
if (!client) {
|
||||
client = RedisClient.create({
|
||||
socket: {
|
||||
host: node.host,
|
||||
port: node.port
|
||||
},
|
||||
readonly
|
||||
});
|
||||
promises.push(client.connect());
|
||||
this.#clientByKey.set(node.url, client);
|
||||
let node = this.#nodeByUrl.get(url);
|
||||
if (!node) {
|
||||
node = {
|
||||
id: nodeData.id,
|
||||
client: RedisClient.create({
|
||||
socket: {
|
||||
host: nodeData.host,
|
||||
port: nodeData.port
|
||||
},
|
||||
readonly
|
||||
})
|
||||
};
|
||||
promises.push(node.client.connect());
|
||||
this.#nodeByUrl.set(url, node);
|
||||
}
|
||||
|
||||
return client;
|
||||
return node;
|
||||
}
|
||||
|
||||
#getSlotMaster(slot: number): RedisClientType<M, S> {
|
||||
getSlotMaster(slot: number): ClusterNode<M, S> {
|
||||
return this.#slots[slot].master;
|
||||
}
|
||||
|
||||
*#slotIterator(slotNumber: number): IterableIterator<RedisClientType<M, S>> {
|
||||
*#slotClientIterator(slotNumber: number): IterableIterator<RedisClientType<M, S>> {
|
||||
const slot = this.#slots[slotNumber];
|
||||
yield slot.master;
|
||||
yield slot.master.client;
|
||||
|
||||
for (const replica of slot.replicas) {
|
||||
yield replica;
|
||||
yield replica.client;
|
||||
}
|
||||
}
|
||||
|
||||
#getSlotClient(slotNumber: number): RedisClientType<M, S> {
|
||||
const slot = this.#slots[slotNumber];
|
||||
if (!slot.iterator) {
|
||||
slot.iterator = this.#slotIterator(slotNumber);
|
||||
if (!slot.clientIterator) {
|
||||
slot.clientIterator = this.#slotClientIterator(slotNumber);
|
||||
}
|
||||
|
||||
const {done, value} = slot.iterator.next();
|
||||
const {done, value} = slot.clientIterator.next();
|
||||
if (done) {
|
||||
slot.iterator = undefined;
|
||||
slot.clientIterator = undefined;
|
||||
return this.#getSlotClient(slotNumber);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
#randomClientIterator?: IterableIterator<RedisClientType<M, S>>;
|
||||
#randomClientIterator?: IterableIterator<ClusterNode<M, S>>;
|
||||
|
||||
#getRandomClient(): RedisClientType<M, S> {
|
||||
if (!this.#clientByKey.size) {
|
||||
if (!this.#nodeByUrl.size) {
|
||||
throw new Error('Cluster is not connected');
|
||||
}
|
||||
|
||||
if (!this.#randomClientIterator) {
|
||||
this.#randomClientIterator = this.#clientByKey.values();
|
||||
this.#randomClientIterator = this.#nodeByUrl.values();
|
||||
}
|
||||
|
||||
const {done, value} = this.#randomClientIterator.next();
|
||||
@@ -166,7 +178,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
|
||||
return this.#getRandomClient();
|
||||
}
|
||||
|
||||
return value;
|
||||
return value.client;
|
||||
}
|
||||
|
||||
getClient(firstKey?: string, isReadonly?: boolean): RedisClientType<M, S> {
|
||||
@@ -176,30 +188,34 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
|
||||
|
||||
const slot = calculateSlot(firstKey);
|
||||
if (!isReadonly || !this.#options.useReplicas) {
|
||||
return this.#getSlotMaster(slot);
|
||||
return this.getSlotMaster(slot).client;
|
||||
}
|
||||
|
||||
return this.#getSlotClient(slot);
|
||||
}
|
||||
|
||||
getMasters(): Array<RedisClientType<M, S>> {
|
||||
getMasters(): Array<ClusterNode<M, S>> {
|
||||
const masters = [];
|
||||
|
||||
for (const client of this.#clientByKey.values()) {
|
||||
if (client.options?.readonly) continue;
|
||||
for (const node of this.#nodeByUrl.values()) {
|
||||
if (node.client.options?.readonly) continue;
|
||||
|
||||
masters.push(client);
|
||||
masters.push(node);
|
||||
}
|
||||
|
||||
return masters;
|
||||
}
|
||||
|
||||
getNodeByUrl(url: string): ClusterNode<M, S> | undefined {
|
||||
return this.#nodeByUrl.get(url);
|
||||
}
|
||||
|
||||
async disconnect(): Promise<void> {
|
||||
await Promise.all(
|
||||
[...this.#clientByKey.values()].map(client => client.disconnect())
|
||||
[...this.#nodeByUrl.values()].map(({ client }) => client.disconnect())
|
||||
);
|
||||
|
||||
this.#clientByKey.clear();
|
||||
this.#nodeByUrl.clear();
|
||||
this.#slots.splice(0);
|
||||
}
|
||||
}
|
||||
|
@@ -1,7 +1,9 @@
|
||||
import { strict as assert } from 'assert';
|
||||
import RedisCluster from './cluster';
|
||||
import { defineScript } from './lua-script';
|
||||
import { TestRedisClusters, TEST_REDIS_CLUSTERES } from './test-utils';
|
||||
import { itWithDedicatedCluster, TestRedisClusters, TEST_REDIS_CLUSTERES } from './test-utils';
|
||||
import calculateSlot from 'cluster-key-slot';
|
||||
import { ClusterSlotStates } from './commands/CLUSTER_SETSLOT';
|
||||
|
||||
describe('Cluster', () => {
|
||||
it('sendCommand', async () => {
|
||||
@@ -12,16 +14,18 @@ describe('Cluster', () => {
|
||||
|
||||
await cluster.connect();
|
||||
|
||||
await cluster.ping();
|
||||
await cluster.set('a', 'b');
|
||||
await cluster.set('a{a}', 'bb');
|
||||
await cluster.set('aa', 'bb');
|
||||
await cluster.get('aa');
|
||||
await cluster.get('aa');
|
||||
await cluster.get('aa');
|
||||
await cluster.get('aa');
|
||||
|
||||
await cluster.disconnect();
|
||||
try {
|
||||
await cluster.ping();
|
||||
await cluster.set('a', 'b');
|
||||
await cluster.set('a{a}', 'bb');
|
||||
await cluster.set('aa', 'bb');
|
||||
await cluster.get('aa');
|
||||
await cluster.get('aa');
|
||||
await cluster.get('aa');
|
||||
await cluster.get('aa');
|
||||
} finally {
|
||||
await cluster.disconnect();
|
||||
}
|
||||
});
|
||||
|
||||
it('scripts', async () => {
|
||||
@@ -54,4 +58,46 @@ describe('Cluster', () => {
|
||||
await cluster.disconnect();
|
||||
}
|
||||
});
|
||||
|
||||
itWithDedicatedCluster('should handle live resharding', async cluster => {
|
||||
const key = 'key',
|
||||
value = 'value';
|
||||
await cluster.set(key, value);
|
||||
|
||||
const slot = calculateSlot(key),
|
||||
from = cluster.getSlotMaster(slot),
|
||||
to = cluster.getMasters().find(node => node.id !== from.id);
|
||||
|
||||
await to!.client.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, from.id);
|
||||
|
||||
// should be able to get the key from the original node before it was migrated
|
||||
assert.equal(
|
||||
await cluster.get(key),
|
||||
value
|
||||
);
|
||||
|
||||
await from.client.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, to!.id);
|
||||
|
||||
// should be able to get the key from the original node using the "ASKING" command
|
||||
assert.equal(
|
||||
await cluster.get(key),
|
||||
value
|
||||
);
|
||||
|
||||
const { port: toPort } = <any>to!.client.options!.socket;
|
||||
|
||||
await from.client.migrate(
|
||||
'127.0.0.1',
|
||||
toPort,
|
||||
key,
|
||||
0,
|
||||
10
|
||||
);
|
||||
|
||||
// should be able to get the key from the new node
|
||||
assert.equal(
|
||||
await cluster.get(key),
|
||||
value
|
||||
);
|
||||
});
|
||||
});
|
||||
|
@@ -1,10 +1,11 @@
|
||||
import COMMANDS from './commands';
|
||||
import { RedisCommand, RedisModules } from './commands';
|
||||
import { ClientCommandOptions, RedisClientType, WithPlugins } from './client';
|
||||
import RedisClient, { ClientCommandOptions, RedisClientType, WithPlugins } from './client';
|
||||
import { RedisSocketOptions } from './socket';
|
||||
import RedisClusterSlots from './cluster-slots';
|
||||
import RedisClusterSlots, { ClusterNode } from './cluster-slots';
|
||||
import { RedisLuaScript, RedisLuaScripts } from './lua-script';
|
||||
import { commandOptions, CommandOptions, isCommandOptions } from './command-options';
|
||||
import { Console } from 'console';
|
||||
|
||||
export interface RedisClusterOptions<M = RedisModules, S = RedisLuaScripts> {
|
||||
rootNodes: Array<RedisSocketOptions>;
|
||||
@@ -105,8 +106,11 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
|
||||
try {
|
||||
return await client.sendCommand(args, options);
|
||||
} catch (err) {
|
||||
if (await this.#handleCommandError(err, client, redirections)) {
|
||||
const shouldRetry = await this.#handleCommandError(err, client, redirections);
|
||||
if (shouldRetry === true) {
|
||||
return this.sendCommand(firstKey, isReadonly, args, options, redirections + 1);
|
||||
} else if (shouldRetry) {
|
||||
return shouldRetry.sendCommand(args, options);
|
||||
}
|
||||
|
||||
throw err;
|
||||
@@ -128,32 +132,52 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
|
||||
try {
|
||||
return await client.executeScript(script, redisArgs, options);
|
||||
} catch (err) {
|
||||
if (await this.#handleCommandError(err, client, redirections)) {
|
||||
const shouldRetry = await this.#handleCommandError(err, client, redirections);
|
||||
if (shouldRetry === true) {
|
||||
return this.executeScript(script, originalArgs, redisArgs, options, redirections + 1);
|
||||
} else if (shouldRetry) {
|
||||
return shouldRetry.executeScript(script, redisArgs, options);
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async #handleCommandError(err: Error, client: RedisClientType<M, S>, redirections = 0): Promise<boolean> {
|
||||
if (redirections < (this.#options.maxCommandRedirections ?? 16)) {
|
||||
async #handleCommandError(err: Error, client: RedisClientType<M, S>, redirections: number): Promise<boolean | RedisClientType<M, S>> {
|
||||
if (redirections > (this.#options.maxCommandRedirections ?? 16)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
if (err.message.startsWith('ASK')) {
|
||||
// TODO
|
||||
const url = err.message.substring(err.message.lastIndexOf(' ') + 1);
|
||||
let node = this.#slots.getNodeByUrl(url);
|
||||
if (!node) {
|
||||
await this.#slots.discover(client);
|
||||
node = this.#slots.getNodeByUrl(url);
|
||||
|
||||
if (!node) {
|
||||
throw new Error(`Cannot find node ${url}`);
|
||||
}
|
||||
}
|
||||
|
||||
await node.client.asking();
|
||||
return node.client;
|
||||
} else if (err.message.startsWith('MOVED')) {
|
||||
await this.#slots.discover(client);
|
||||
return client;
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
|
||||
getMasters(): Array<RedisClientType<M, S>> {
|
||||
getMasters(): Array<ClusterNode<M, S>> {
|
||||
return this.#slots.getMasters();
|
||||
}
|
||||
|
||||
getSlotMaster(slot: number): ClusterNode<M, S> {
|
||||
return this.#slots.getSlotMaster(slot);
|
||||
}
|
||||
|
||||
disconnect(): Promise<void> {
|
||||
return this.#slots.disconnect();
|
||||
}
|
||||
|
11
lib/commands/ASKING.spec.ts
Normal file
11
lib/commands/ASKING.spec.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { strict as assert } from 'assert';
|
||||
import { transformArguments } from './ASKING';
|
||||
|
||||
describe('ASKING', () => {
|
||||
it('transformArguments', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments(),
|
||||
['ASKING']
|
||||
);
|
||||
});
|
||||
});
|
7
lib/commands/ASKING.ts
Normal file
7
lib/commands/ASKING.ts
Normal file
@@ -0,0 +1,7 @@
|
||||
import { transformReplyString } from './generic-transformers';
|
||||
|
||||
export function transformArguments(): Array<string> {
|
||||
return ['ASKING'];
|
||||
}
|
||||
|
||||
export const transformReply = transformReplyString;
|
11
lib/commands/CLUSTER_GETKEYSINSLOT.spec.ts
Normal file
11
lib/commands/CLUSTER_GETKEYSINSLOT.spec.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { strict as assert } from 'assert';
|
||||
import { transformArguments } from './CLUSTER_GETKEYSINSLOT';
|
||||
|
||||
describe('CLUSTER GETKEYSINSLOT', () => {
|
||||
it('transformArguments', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments(0, 10),
|
||||
['CLUSTER', 'GETKEYSINSLOT', '0', '10']
|
||||
);
|
||||
});
|
||||
});
|
7
lib/commands/CLUSTER_GETKEYSINSLOT.ts
Normal file
7
lib/commands/CLUSTER_GETKEYSINSLOT.ts
Normal file
@@ -0,0 +1,7 @@
|
||||
import { transformReplyString } from './generic-transformers';
|
||||
|
||||
export function transformArguments(slot: number, count: number): Array<string> {
|
||||
return ['CLUSTER', 'GETKEYSINSLOT', slot.toString(), count.toString()];
|
||||
}
|
||||
|
||||
export const transformReply = transformReplyString;
|
@@ -10,14 +10,14 @@ describe('CLUSTER RESET', () => {
|
||||
);
|
||||
});
|
||||
|
||||
it('simple', () => {
|
||||
it('HARD', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments('HARD'),
|
||||
['CLUSTER', 'RESET', 'HARD']
|
||||
);
|
||||
});
|
||||
|
||||
it('simple', () => {
|
||||
it('SOFT', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments('SOFT'),
|
||||
['CLUSTER', 'RESET', 'SOFT']
|
||||
|
20
lib/commands/CLUSTER_SETSLOT.spec.ts
Normal file
20
lib/commands/CLUSTER_SETSLOT.spec.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import { strict as assert } from 'assert';
|
||||
import { ClusterSlotStates, transformArguments } from './CLUSTER_SETSLOT';
|
||||
|
||||
describe('CLUSTER SETSLOT', () => {
|
||||
describe('transformArguments', () => {
|
||||
it('simple', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments(0, ClusterSlotStates.IMPORTING),
|
||||
['CLUSTER', 'SETSLOT', '0', 'IMPORTING']
|
||||
);
|
||||
});
|
||||
|
||||
it('with nodeId', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments(0, ClusterSlotStates.IMPORTING, 'nodeId'),
|
||||
['CLUSTER', 'SETSLOT', '0', 'IMPORTING', 'nodeId']
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
20
lib/commands/CLUSTER_SETSLOT.ts
Normal file
20
lib/commands/CLUSTER_SETSLOT.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import { transformReplyString } from './generic-transformers';
|
||||
|
||||
export enum ClusterSlotStates {
|
||||
IMPORTING = 'IMPORTING',
|
||||
MIGRATING = 'MIGRATING',
|
||||
STABLE = 'STABLE',
|
||||
NODE = 'NODE'
|
||||
}
|
||||
|
||||
export function transformArguments(slot: number, state: ClusterSlotStates, nodeId?: string): Array<string> {
|
||||
const args = ['CLUSTER', 'SETSLOT', slot.toString(), state];
|
||||
|
||||
if (nodeId) {
|
||||
args.push(nodeId);
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
export const transformReply = transformReplyString;
|
76
lib/commands/MIGRATE.spec.ts
Normal file
76
lib/commands/MIGRATE.spec.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import { strict as assert } from 'assert';
|
||||
import { transformArguments } from './MIGRATE';
|
||||
|
||||
describe('MIGRATE', () => {
|
||||
describe('transformArguments', () => {
|
||||
it('single key', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments('127.0.0.1', 6379, 'key', 0, 10),
|
||||
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10']
|
||||
);
|
||||
});
|
||||
|
||||
it('multiple keys', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments('127.0.0.1', 6379, ['1', '2'], 0, 10),
|
||||
['MIGRATE', '127.0.0.1', '6379', '""', '0', '10', 'KEYS', '1', '2']
|
||||
);
|
||||
});
|
||||
|
||||
it('with COPY', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
|
||||
COPY: true
|
||||
}),
|
||||
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'COPY']
|
||||
);
|
||||
});
|
||||
|
||||
it('with REPLACE', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
|
||||
REPLACE: true
|
||||
}),
|
||||
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'REPLACE']
|
||||
);
|
||||
});
|
||||
|
||||
describe('with AUTH', () => {
|
||||
it('password only', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
|
||||
AUTH: {
|
||||
password: 'password'
|
||||
}
|
||||
}),
|
||||
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'AUTH', 'password']
|
||||
);
|
||||
});
|
||||
|
||||
it('username & password', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
|
||||
AUTH: {
|
||||
username: 'username',
|
||||
password: 'password'
|
||||
}
|
||||
}),
|
||||
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'AUTH2', 'username', 'password']
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it('with COPY, REPLACE, AUTH', () => {
|
||||
assert.deepEqual(
|
||||
transformArguments('127.0.0.1', 6379, 'key', 0, 10, {
|
||||
COPY: true,
|
||||
REPLACE: true,
|
||||
AUTH: {
|
||||
password: 'password'
|
||||
}
|
||||
}),
|
||||
['MIGRATE', '127.0.0.1', '6379', 'key', '0', '10', 'COPY', 'REPLACE', 'AUTH', 'password']
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
65
lib/commands/MIGRATE.ts
Normal file
65
lib/commands/MIGRATE.ts
Normal file
@@ -0,0 +1,65 @@
|
||||
import { AuthOptions } from './AUTH';
|
||||
import { transformReplyString } from './generic-transformers';
|
||||
|
||||
interface MigrateOptions {
|
||||
COPY?: true;
|
||||
REPLACE?: true;
|
||||
AUTH?: AuthOptions;
|
||||
}
|
||||
|
||||
export function transformArguments(
|
||||
host: string,
|
||||
port: number,
|
||||
key: string | Array<string>,
|
||||
destinationDb: number,
|
||||
timeout: number,
|
||||
options?: MigrateOptions
|
||||
): Array<string> {
|
||||
const args = ['MIGRATE', host, port.toString()],
|
||||
isKeyString = typeof key === 'string';
|
||||
|
||||
if (isKeyString) {
|
||||
args.push(key as string);
|
||||
} else {
|
||||
args.push('""');
|
||||
}
|
||||
|
||||
args.push(
|
||||
destinationDb.toString(),
|
||||
timeout.toString()
|
||||
);
|
||||
|
||||
if (options?.COPY) {
|
||||
args.push('COPY');
|
||||
}
|
||||
|
||||
if (options?.REPLACE) {
|
||||
args.push('REPLACE');
|
||||
}
|
||||
|
||||
if (options?.AUTH) {
|
||||
if (options.AUTH.username) {
|
||||
args.push(
|
||||
'AUTH2',
|
||||
options.AUTH.username,
|
||||
options.AUTH.password
|
||||
);
|
||||
} else {
|
||||
args.push(
|
||||
'AUTH',
|
||||
options.AUTH.password
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (!isKeyString) {
|
||||
args.push(
|
||||
'KEYS',
|
||||
...key
|
||||
);
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
export const transformReply = transformReplyString;
|
@@ -1,4 +1,5 @@
|
||||
import * as APPEND from './APPEND';
|
||||
import * as ASKING from './ASKING';
|
||||
import * as AUTH from './AUTH';
|
||||
import * as BITCOUNT from './BITCOUNT';
|
||||
import * as BITFIELD from './BITFIELD';
|
||||
@@ -15,6 +16,7 @@ import * as CLUSTER_INFO from './CLUSTER_INFO';
|
||||
import * as CLUSTER_NODES from './CLUSTER_NODES';
|
||||
import * as CLUSTER_MEET from './CLUSTER_MEET';
|
||||
import * as CLUSTER_RESET from './CLUSTER_RESET';
|
||||
import * as CLUSTER_SETSLOT from './CLUSTER_SETSLOT';
|
||||
import * as COPY from './COPY';
|
||||
import * as DECR from './DECR';
|
||||
import * as DECRBY from './DECRBY';
|
||||
@@ -61,6 +63,7 @@ import * as LRANGE from './LRANGE';
|
||||
import * as LREM from './LREM';
|
||||
import * as LSET from './LSET';
|
||||
import * as LTRIM from './LTRIM';
|
||||
import * as MIGRATE from './MIGRATE';
|
||||
import * as MOVE from './MOVE';
|
||||
import * as PERSIST from './PERSIST';
|
||||
import * as PEXPIRE from './PEXPIRE';
|
||||
@@ -167,6 +170,8 @@ import * as ZUNIONSTORE from './ZUNIONSTORE';
|
||||
export default {
|
||||
APPEND,
|
||||
append: APPEND,
|
||||
ASKING,
|
||||
asking: ASKING,
|
||||
AUTH,
|
||||
auth: AUTH,
|
||||
BITCOUNT,
|
||||
@@ -199,6 +204,8 @@ export default {
|
||||
clusterMeet: CLUSTER_MEET,
|
||||
CLUSTER_RESET,
|
||||
clusterReset: CLUSTER_RESET,
|
||||
CLUSTER_SETSLOT,
|
||||
clusterSetSlot: CLUSTER_SETSLOT,
|
||||
COPY,
|
||||
copy: COPY,
|
||||
DECR,
|
||||
@@ -290,6 +297,8 @@ export default {
|
||||
lSet: LSET,
|
||||
LTRIM,
|
||||
lTrim: LTRIM,
|
||||
MIGRATE,
|
||||
migrate: MIGRATE,
|
||||
MOVE,
|
||||
move: MOVE,
|
||||
PERSIST,
|
||||
|
@@ -4,7 +4,6 @@ import { RedisModules } from './commands';
|
||||
import { RedisLuaScripts } from './lua-script';
|
||||
import { spawn } from 'child_process';
|
||||
import { once } from 'events';
|
||||
import tcpPortUsed from 'tcp-port-used';
|
||||
import { RedisSocketOptions } from './socket';
|
||||
import which from 'which';
|
||||
import { SinonSpy } from 'sinon';
|
||||
@@ -37,20 +36,20 @@ before(function () {
|
||||
|
||||
async function spawnOpenServer(): Promise<void> {
|
||||
TEST_REDIS_SERVERS[TestRedisServers.OPEN] = {
|
||||
port: await spawnRedisServer()
|
||||
port: await spawnGlobalRedisServer()
|
||||
};
|
||||
}
|
||||
|
||||
async function spawnPasswordServer(): Promise<void> {
|
||||
TEST_REDIS_SERVERS[TestRedisServers.PASSWORD] = {
|
||||
port: await spawnRedisServer(['--requirepass', 'password']),
|
||||
port: await spawnGlobalRedisServer(['--requirepass', 'password']),
|
||||
username: 'default',
|
||||
password: 'password'
|
||||
};
|
||||
}
|
||||
|
||||
async function spawnOpenCluster(): Promise<void> {
|
||||
TEST_REDIS_CLUSTERES[TestRedisClusters.OPEN] = (await spawnRedisCluster(TestRedisClusters.OPEN, 3)).map(port => ({
|
||||
TEST_REDIS_CLUSTERES[TestRedisClusters.OPEN] = (await spawnGlobalRedisCluster(TestRedisClusters.OPEN, 3)).map(port => ({
|
||||
port
|
||||
}));
|
||||
}
|
||||
@@ -91,9 +90,34 @@ export function itWithCluster(type: TestRedisClusters, title: string, fn: (clust
|
||||
});
|
||||
}
|
||||
|
||||
export function itWithDedicatedCluster(title: string, fn: (cluster: RedisClusterType<RedisModules, RedisLuaScripts>) => Promise<void>): void {
|
||||
it(title, async function () {
|
||||
this.timeout(10000);
|
||||
|
||||
const spawnResults = await spawnRedisCluster(null, 3),
|
||||
cluster = RedisCluster.create({
|
||||
rootNodes: [{
|
||||
port: spawnResults[0].port
|
||||
}]
|
||||
});
|
||||
|
||||
await cluster.connect();
|
||||
|
||||
try {
|
||||
await fn(cluster);
|
||||
} finally {
|
||||
await cluster.disconnect();
|
||||
|
||||
for (const { cleanup } of spawnResults) {
|
||||
await cleanup();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function clusterFlushAll(cluster: RedisCluster): Promise<void> {
|
||||
await Promise.all(
|
||||
cluster.getMasters().map(master => master.flushAll())
|
||||
cluster.getMasters().map(({ client }) => client.flushAll())
|
||||
);
|
||||
}
|
||||
|
||||
@@ -101,7 +125,12 @@ const REDIS_PATH = which.sync('redis-server');
|
||||
|
||||
let port = 6379;
|
||||
|
||||
async function spawnRedisServer(args?: Array<string>): Promise<number> {
|
||||
interface SpawnRedisServerResult {
|
||||
port: number;
|
||||
cleanup: () => Promise<void>;
|
||||
}
|
||||
|
||||
async function spawnRedisServer(args?: Array<string>): Promise<SpawnRedisServerResult> {
|
||||
const currentPort = port++,
|
||||
process = spawn(REDIS_PATH, [
|
||||
'--save',
|
||||
@@ -110,23 +139,41 @@ async function spawnRedisServer(args?: Array<string>): Promise<number> {
|
||||
currentPort.toString(),
|
||||
...(args ?? [])
|
||||
]);
|
||||
|
||||
process
|
||||
.on('error', err => console.error('Redis process error', err))
|
||||
.on('close', code => console.error(`Redis process closed unexpectedly with code ${code}`));
|
||||
|
||||
// TODO: catch process exit
|
||||
for await (const chunk of process.stdout) {
|
||||
if (chunk.toString().includes('Ready to accept connections')) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
await tcpPortUsed.waitForStatus(currentPort, '127.0.0.1', true, 10, 10000);
|
||||
if (process.exitCode !== null) {
|
||||
throw new Error('Error while spawning redis server');
|
||||
}
|
||||
|
||||
after(() => {
|
||||
assert.ok(process.kill());
|
||||
return once(process, 'close');
|
||||
});
|
||||
return {
|
||||
port: currentPort,
|
||||
async cleanup(): Promise<void> {
|
||||
process.removeAllListeners('close');
|
||||
assert.ok(process.kill());
|
||||
await once(process, 'close');
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return currentPort;
|
||||
async function spawnGlobalRedisServer(args?: Array<string>): Promise<number> {
|
||||
const { port, cleanup } = await spawnRedisServer(args);
|
||||
after(cleanup);
|
||||
return port;
|
||||
}
|
||||
|
||||
const SLOTS = 16384,
|
||||
CLUSTER_NODE_TIMEOUT = 2000;
|
||||
|
||||
async function spawnRedisCluster(type: TestRedisClusters, numberOfNodes: number, args?: Array<string>): Promise<Array<number>> {
|
||||
export async function spawnRedisCluster(type: TestRedisClusters | null, numberOfNodes: number, args?: Array<string>): Promise<Array<SpawnRedisServerResult>> {
|
||||
const spawnPromises = [],
|
||||
slotsPerNode = Math.floor(SLOTS / numberOfNodes);
|
||||
for (let i = 0; i < numberOfNodes; i++) {
|
||||
@@ -157,30 +204,41 @@ async function spawnRedisCluster(type: TestRedisClusters, numberOfNodes: number,
|
||||
await setTimeout(CLUSTER_NODE_TIMEOUT);
|
||||
}
|
||||
|
||||
const ports = [],
|
||||
disconnectPromises = [];
|
||||
for (const { port, client } of spawnResults) {
|
||||
ports.push(port);
|
||||
disconnectPromises.push(client.disconnect());
|
||||
const disconnectPromises = [];
|
||||
for (const result of spawnResults) {
|
||||
disconnectPromises.push(result.client.disconnect());
|
||||
}
|
||||
|
||||
await Promise.all(disconnectPromises);
|
||||
|
||||
return ports;
|
||||
return spawnResults;
|
||||
}
|
||||
|
||||
export async function spawnGlobalRedisCluster(type: TestRedisClusters | null, numberOfNodes: number, args?: Array<string>): Promise<Array<number>> {
|
||||
const results = await spawnRedisCluster(type, numberOfNodes, args);
|
||||
|
||||
after(() => {
|
||||
for (const { cleanup } of results) {
|
||||
cleanup();
|
||||
}
|
||||
});
|
||||
|
||||
return results.map(({ port }) => port);
|
||||
}
|
||||
|
||||
interface SpawnRedisClusterNodeResult extends SpawnRedisServerResult {
|
||||
client: RedisClientType<RedisModules, RedisLuaScripts>
|
||||
}
|
||||
|
||||
async function spawnRedisClusterNode(
|
||||
type: TestRedisClusters,
|
||||
type: TestRedisClusters | null,
|
||||
nodeIndex: number,
|
||||
fromSlot: number,
|
||||
toSlot: number,
|
||||
args?: Array<string>
|
||||
): Promise<{
|
||||
port: number;
|
||||
client: RedisClientType<RedisModules, RedisLuaScripts>;
|
||||
}> {
|
||||
): Promise<SpawnRedisClusterNodeResult> {
|
||||
const clusterConfigFile = `/tmp/${type}-${nodeIndex}.conf`,
|
||||
port = await spawnRedisServer([
|
||||
{ port, cleanup: originalCleanup } = await spawnRedisServer([
|
||||
'--cluster-enabled',
|
||||
'yes',
|
||||
'--cluster-node-timeout',
|
||||
@@ -190,16 +248,6 @@ async function spawnRedisClusterNode(
|
||||
...(args ?? [])
|
||||
]);
|
||||
|
||||
after(async () => {
|
||||
try {
|
||||
await unlink(clusterConfigFile);
|
||||
} catch (err) {
|
||||
if (err.code == 'ENOENT') return;
|
||||
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
const client = RedisClient.create({
|
||||
socket: {
|
||||
port
|
||||
@@ -220,6 +268,17 @@ async function spawnRedisClusterNode(
|
||||
|
||||
return {
|
||||
port,
|
||||
async cleanup(): Promise<void> {
|
||||
await originalCleanup();
|
||||
|
||||
try {
|
||||
await unlink(clusterConfigFile);
|
||||
} catch (err) {
|
||||
if (err.code == 'ENOENT') return;
|
||||
|
||||
throw err;
|
||||
}
|
||||
},
|
||||
client
|
||||
};
|
||||
}
|
||||
|
Reference in New Issue
Block a user