You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-07 13:22:56 +03:00
comment cluster request & response policies (keep v4 behaver)
This commit is contained in:
@@ -1,362 +1,348 @@
|
||||
// import { strict as assert } from 'node:assert';
|
||||
// import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
|
||||
// import RedisCluster from '.';
|
||||
import { strict as assert } from 'node:assert';
|
||||
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
|
||||
import RedisCluster from '.';
|
||||
// import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT';
|
||||
// import { commandOptions } from '../command-options';
|
||||
// import { SQUARE_SCRIPT } from '../client/index.spec';
|
||||
// import { RootNodesUnavailableError } from '../errors';
|
||||
// import { spy } from 'sinon';
|
||||
// import { setTimeout } from 'timers/promises';
|
||||
// import RedisClient from '../client';
|
||||
import { SQUARE_SCRIPT } from '../client/index.spec';
|
||||
import { RootNodesUnavailableError } from '../errors';
|
||||
import { spy } from 'sinon';
|
||||
// import { setTimeout } from 'node:timers/promises';
|
||||
import RedisClient from '../client';
|
||||
|
||||
// describe('Cluster', () => {
|
||||
// testUtils.testWithCluster('sendCommand', async cluster => {
|
||||
// assert.equal(
|
||||
// await cluster.sendCommand(undefined, true, ['PING']),
|
||||
// 'PONG'
|
||||
// );
|
||||
// }, GLOBAL.CLUSTERS.OPEN);
|
||||
describe('Cluster', () => {
|
||||
testUtils.testWithCluster('sendCommand', async cluster => {
|
||||
assert.equal(
|
||||
await cluster.sendCommand(undefined, true, ['PING']),
|
||||
'PONG'
|
||||
);
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
// testUtils.testWithCluster('isOpen', async cluster => {
|
||||
// assert.equal(cluster.isOpen, true);
|
||||
// await cluster.disconnect();
|
||||
// assert.equal(cluster.isOpen, false);
|
||||
// }, GLOBAL.CLUSTERS.OPEN);
|
||||
testUtils.testWithCluster('isOpen', async cluster => {
|
||||
assert.equal(cluster.isOpen, true);
|
||||
await cluster.destroy();
|
||||
assert.equal(cluster.isOpen, false);
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
// testUtils.testWithCluster('connect should throw if already connected', async cluster => {
|
||||
// await assert.rejects(cluster.connect());
|
||||
// }, GLOBAL.CLUSTERS.OPEN);
|
||||
testUtils.testWithCluster('connect should throw if already connected', async cluster => {
|
||||
await assert.rejects(cluster.connect());
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
// testUtils.testWithCluster('multi', async cluster => {
|
||||
// const key = 'key';
|
||||
// assert.deepEqual(
|
||||
// await cluster.multi()
|
||||
// .set(key, 'value')
|
||||
// .get(key)
|
||||
// .exec(),
|
||||
// ['OK', 'value']
|
||||
// );
|
||||
// }, GLOBAL.CLUSTERS.OPEN);
|
||||
testUtils.testWithCluster('multi', async cluster => {
|
||||
const key = 'key';
|
||||
assert.deepEqual(
|
||||
await cluster.multi()
|
||||
.set(key, 'value')
|
||||
.get(key)
|
||||
.exec(),
|
||||
['OK', 'value']
|
||||
);
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
// testUtils.testWithCluster('scripts', async cluster => {
|
||||
// assert.equal(
|
||||
// await cluster.square(2),
|
||||
// 4
|
||||
// );
|
||||
// }, {
|
||||
// ...GLOBAL.CLUSTERS.OPEN,
|
||||
// clusterConfiguration: {
|
||||
// scripts: {
|
||||
// square: SQUARE_SCRIPT
|
||||
// }
|
||||
// }
|
||||
// });
|
||||
testUtils.testWithCluster('scripts', async cluster => {
|
||||
const [, reply] = await Promise.all([
|
||||
cluster.set('key', '2'),
|
||||
cluster.square('key')
|
||||
]);
|
||||
|
||||
// it('should throw RootNodesUnavailableError', async () => {
|
||||
// const cluster = RedisCluster.create({
|
||||
// rootNodes: []
|
||||
// });
|
||||
assert.equal(reply, 4);
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
clusterConfiguration: {
|
||||
scripts: {
|
||||
square: SQUARE_SCRIPT
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// try {
|
||||
// await assert.rejects(
|
||||
// cluster.connect(),
|
||||
// RootNodesUnavailableError
|
||||
// );
|
||||
// } catch (err) {
|
||||
// await cluster.disconnect();
|
||||
// throw err;
|
||||
// }
|
||||
// });
|
||||
it('should throw RootNodesUnavailableError', async () => {
|
||||
const cluster = RedisCluster.create({
|
||||
rootNodes: []
|
||||
});
|
||||
|
||||
// testUtils.testWithCluster('should handle live resharding', async cluster => {
|
||||
// const slot = 12539,
|
||||
// key = 'key',
|
||||
// value = 'value';
|
||||
// await cluster.set(key, value);
|
||||
try {
|
||||
await assert.rejects(
|
||||
cluster.connect(),
|
||||
RootNodesUnavailableError
|
||||
);
|
||||
} catch (err) {
|
||||
await cluster.disconnect();
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
// const importing = cluster.slots[0].master,
|
||||
// migrating = cluster.slots[slot].master,
|
||||
// [ importingClient, migratingClient ] = await Promise.all([
|
||||
// cluster.nodeClient(importing),
|
||||
// cluster.nodeClient(migrating)
|
||||
// ]);
|
||||
// testUtils.testWithCluster('should handle live resharding', async cluster => {
|
||||
// const slot = 12539,
|
||||
// key = 'key',
|
||||
// value = 'value';
|
||||
// await cluster.set(key, value);
|
||||
|
||||
// await Promise.all([
|
||||
// importingClient.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, migrating.id),
|
||||
// migratingClient.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, importing.id)
|
||||
// ]);
|
||||
// const importing = cluster.slots[0].master,
|
||||
// migrating = cluster.slots[slot].master,
|
||||
// [importingClient, migratingClient] = await Promise.all([
|
||||
// cluster.nodeClient(importing),
|
||||
// cluster.nodeClient(migrating)
|
||||
// ]);
|
||||
|
||||
// // should be able to get the key from the migrating node
|
||||
// assert.equal(
|
||||
// await cluster.get(key),
|
||||
// value
|
||||
// );
|
||||
// await Promise.all([
|
||||
// importingClient.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, migrating.id),
|
||||
// migratingClient.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, importing.id)
|
||||
// ]);
|
||||
|
||||
// await migratingClient.migrate(
|
||||
// importing.host,
|
||||
// importing.port,
|
||||
// key,
|
||||
// 0,
|
||||
// 10
|
||||
// );
|
||||
// // should be able to get the key from the migrating node
|
||||
// assert.equal(
|
||||
// await cluster.get(key),
|
||||
// value
|
||||
// );
|
||||
|
||||
// // should be able to get the key from the importing node using `ASKING`
|
||||
// assert.equal(
|
||||
// await cluster.get(key),
|
||||
// value
|
||||
// );
|
||||
// await migratingClient.migrate(
|
||||
// importing.host,
|
||||
// importing.port,
|
||||
// key,
|
||||
// 0,
|
||||
// 10
|
||||
// );
|
||||
|
||||
// await Promise.all([
|
||||
// importingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id),
|
||||
// migratingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id),
|
||||
// ]);
|
||||
// // should be able to get the key from the importing node using `ASKING`
|
||||
// assert.equal(
|
||||
// await cluster.get(key),
|
||||
// value
|
||||
// );
|
||||
|
||||
// // should handle `MOVED` errors
|
||||
// assert.equal(
|
||||
// await cluster.get(key),
|
||||
// value
|
||||
// );
|
||||
// }, {
|
||||
// serverArguments: [],
|
||||
// numberOfMasters: 2
|
||||
// });
|
||||
// await Promise.all([
|
||||
// importingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id),
|
||||
// migratingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id),
|
||||
// ]);
|
||||
|
||||
// testUtils.testWithCluster('getRandomNode should spread the the load evenly', async cluster => {
|
||||
// const totalNodes = cluster.masters.length + cluster.replicas.length,
|
||||
// ids = new Set<string>();
|
||||
// for (let i = 0; i < totalNodes; i++) {
|
||||
// ids.add(cluster.getRandomNode().id);
|
||||
// }
|
||||
|
||||
// assert.equal(ids.size, totalNodes);
|
||||
// }, GLOBAL.CLUSTERS.WITH_REPLICAS);
|
||||
// // should handle `MOVED` errors
|
||||
// assert.equal(
|
||||
// await cluster.get(key),
|
||||
// value
|
||||
// );
|
||||
// }, {
|
||||
// serverArguments: [],
|
||||
// numberOfMasters: 2
|
||||
// });
|
||||
|
||||
// testUtils.testWithCluster('getSlotRandomNode should spread the the load evenly', async cluster => {
|
||||
// const totalNodes = 1 + cluster.slots[0].replicas!.length,
|
||||
// ids = new Set<string>();
|
||||
// for (let i = 0; i < totalNodes; i++) {
|
||||
// ids.add(cluster.getSlotRandomNode(0).id);
|
||||
// }
|
||||
|
||||
// assert.equal(ids.size, totalNodes);
|
||||
// }, GLOBAL.CLUSTERS.WITH_REPLICAS);
|
||||
testUtils.testWithCluster('getRandomNode should spread the the load evenly', async cluster => {
|
||||
const totalNodes = cluster.masters.length + cluster.replicas.length,
|
||||
ids = new Set<string>();
|
||||
for (let i = 0; i < totalNodes; i++) {
|
||||
ids.add(cluster.getRandomNode().id);
|
||||
}
|
||||
|
||||
// testUtils.testWithCluster('cluster topology', async cluster => {
|
||||
// assert.equal(cluster.slots.length, 16384);
|
||||
// const { numberOfMasters, numberOfReplicas } = GLOBAL.CLUSTERS.WITH_REPLICAS;
|
||||
// assert.equal(cluster.shards.length, numberOfMasters);
|
||||
// assert.equal(cluster.masters.length, numberOfMasters);
|
||||
// assert.equal(cluster.replicas.length, numberOfReplicas * numberOfMasters);
|
||||
// assert.equal(cluster.nodeByAddress.size, numberOfMasters + numberOfMasters * numberOfReplicas);
|
||||
// }, GLOBAL.CLUSTERS.WITH_REPLICAS);
|
||||
assert.equal(ids.size, totalNodes);
|
||||
}, GLOBAL.CLUSTERS.WITH_REPLICAS);
|
||||
|
||||
// testUtils.testWithCluster('getMasters should be backwards competiable (without `minimizeConnections`)', async cluster => {
|
||||
// const masters = cluster.getMasters();
|
||||
// assert.ok(Array.isArray(masters));
|
||||
// for (const master of masters) {
|
||||
// assert.equal(typeof master.id, 'string');
|
||||
// assert.ok(master.client instanceof RedisClient);
|
||||
// }
|
||||
// }, {
|
||||
// ...GLOBAL.CLUSTERS.OPEN,
|
||||
// clusterConfiguration: {
|
||||
// minimizeConnections: undefined // reset to default
|
||||
// }
|
||||
// });
|
||||
testUtils.testWithCluster('getSlotRandomNode should spread the the load evenly', async cluster => {
|
||||
const totalNodes = 1 + cluster.slots[0].replicas!.length,
|
||||
ids = new Set<string>();
|
||||
for (let i = 0; i < totalNodes; i++) {
|
||||
ids.add(cluster.getSlotRandomNode(0).id);
|
||||
}
|
||||
|
||||
// testUtils.testWithCluster('getSlotMaster should be backwards competiable (without `minimizeConnections`)', async cluster => {
|
||||
// const master = cluster.getSlotMaster(0);
|
||||
// assert.equal(typeof master.id, 'string');
|
||||
// assert.ok(master.client instanceof RedisClient);
|
||||
// }, {
|
||||
// ...GLOBAL.CLUSTERS.OPEN,
|
||||
// clusterConfiguration: {
|
||||
// minimizeConnections: undefined // reset to default
|
||||
// }
|
||||
// });
|
||||
assert.equal(ids.size, totalNodes);
|
||||
}, GLOBAL.CLUSTERS.WITH_REPLICAS);
|
||||
|
||||
// testUtils.testWithCluster('should throw CROSSSLOT error', async cluster => {
|
||||
// await assert.rejects(cluster.mGet(['a', 'b']));
|
||||
// }, GLOBAL.CLUSTERS.OPEN);
|
||||
testUtils.testWithCluster('cluster topology', async cluster => {
|
||||
assert.equal(cluster.slots.length, 16384);
|
||||
const { numberOfMasters, numberOfReplicas } = GLOBAL.CLUSTERS.WITH_REPLICAS;
|
||||
assert.equal(cluster.shards.length, numberOfMasters);
|
||||
assert.equal(cluster.masters.length, numberOfMasters);
|
||||
assert.equal(cluster.replicas.length, numberOfReplicas * numberOfMasters);
|
||||
assert.equal(cluster.nodeByAddress.size, numberOfMasters + numberOfMasters * numberOfReplicas);
|
||||
}, GLOBAL.CLUSTERS.WITH_REPLICAS);
|
||||
|
||||
// testUtils.testWithCluster('should send commands with commandOptions to correct cluster slot (without redirections)', async cluster => {
|
||||
// // 'a' and 'b' hash to different cluster slots (see previous unit test)
|
||||
// // -> maxCommandRedirections 0: rejects on MOVED/ASK reply
|
||||
// await cluster.set(commandOptions({ isolated: true }), 'a', '1'),
|
||||
// await cluster.set(commandOptions({ isolated: true }), 'b', '2'),
|
||||
testUtils.testWithCluster('getMasters should be backwards competiable (without `minimizeConnections`)', async cluster => {
|
||||
const masters = cluster.getMasters();
|
||||
assert.ok(Array.isArray(masters));
|
||||
for (const master of masters) {
|
||||
assert.equal(typeof master.id, 'string');
|
||||
assert.ok(master.client instanceof RedisClient);
|
||||
}
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
clusterConfiguration: {
|
||||
minimizeConnections: undefined // reset to default
|
||||
}
|
||||
});
|
||||
|
||||
// assert.equal(await cluster.get('a'), '1');
|
||||
// assert.equal(await cluster.get('b'), '2');
|
||||
// }, {
|
||||
// ...GLOBAL.CLUSTERS.OPEN,
|
||||
// clusterConfiguration: {
|
||||
// maxCommandRedirections: 0
|
||||
// }
|
||||
// });
|
||||
testUtils.testWithCluster('getSlotMaster should be backwards competiable (without `minimizeConnections`)', async cluster => {
|
||||
const master = cluster.getSlotMaster(0);
|
||||
assert.equal(typeof master.id, 'string');
|
||||
assert.ok(master.client instanceof RedisClient);
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
clusterConfiguration: {
|
||||
minimizeConnections: undefined // reset to default
|
||||
}
|
||||
});
|
||||
|
||||
// describe('minimizeConnections', () => {
|
||||
// testUtils.testWithCluster('false', async cluster => {
|
||||
// for (const master of cluster.masters) {
|
||||
// assert.ok(master.client instanceof RedisClient);
|
||||
// }
|
||||
// }, {
|
||||
// ...GLOBAL.CLUSTERS.OPEN,
|
||||
// clusterConfiguration: {
|
||||
// minimizeConnections: false
|
||||
// }
|
||||
// });
|
||||
testUtils.testWithCluster('should throw CROSSSLOT error', async cluster => {
|
||||
await assert.rejects(cluster.mGet(['a', 'b']));
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
// testUtils.testWithCluster('true', async cluster => {
|
||||
// for (const master of cluster.masters) {
|
||||
// assert.equal(master.client, undefined);
|
||||
// }
|
||||
// }, {
|
||||
// ...GLOBAL.CLUSTERS.OPEN,
|
||||
// clusterConfiguration: {
|
||||
// minimizeConnections: true
|
||||
// }
|
||||
// });
|
||||
// });
|
||||
describe('minimizeConnections', () => {
|
||||
testUtils.testWithCluster('false', async cluster => {
|
||||
for (const master of cluster.masters) {
|
||||
assert.ok(master.client instanceof RedisClient);
|
||||
}
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
clusterConfiguration: {
|
||||
minimizeConnections: false
|
||||
}
|
||||
});
|
||||
|
||||
// describe('PubSub', () => {
|
||||
// testUtils.testWithCluster('subscribe & unsubscribe', async cluster => {
|
||||
// const listener = spy();
|
||||
testUtils.testWithCluster('true', async cluster => {
|
||||
for (const master of cluster.masters) {
|
||||
assert.equal(master.client, undefined);
|
||||
}
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
clusterConfiguration: {
|
||||
minimizeConnections: true
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// await cluster.subscribe('channel', listener);
|
||||
describe('PubSub', () => {
|
||||
testUtils.testWithCluster('subscribe & unsubscribe', async cluster => {
|
||||
const listener = spy();
|
||||
|
||||
// await Promise.all([
|
||||
// waitTillBeenCalled(listener),
|
||||
// cluster.publish('channel', 'message')
|
||||
// ]);
|
||||
|
||||
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
await cluster.subscribe('channel', listener);
|
||||
|
||||
// await cluster.unsubscribe('channel', listener);
|
||||
await Promise.all([
|
||||
waitTillBeenCalled(listener),
|
||||
cluster.publish('channel', 'message')
|
||||
]);
|
||||
|
||||
// assert.equal(cluster.pubSubNode, undefined);
|
||||
// }, GLOBAL.CLUSTERS.OPEN);
|
||||
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
|
||||
// testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => {
|
||||
// const listener = spy();
|
||||
await cluster.unsubscribe('channel', listener);
|
||||
|
||||
// await cluster.pSubscribe('channe*', listener);
|
||||
assert.equal(cluster.pubSubNode, undefined);
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
// await Promise.all([
|
||||
// waitTillBeenCalled(listener),
|
||||
// cluster.publish('channel', 'message')
|
||||
// ]);
|
||||
|
||||
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => {
|
||||
const listener = spy();
|
||||
|
||||
// await cluster.pUnsubscribe('channe*', listener);
|
||||
await cluster.pSubscribe('channe*', listener);
|
||||
|
||||
// assert.equal(cluster.pubSubNode, undefined);
|
||||
// }, GLOBAL.CLUSTERS.OPEN);
|
||||
await Promise.all([
|
||||
waitTillBeenCalled(listener),
|
||||
cluster.publish('channel', 'message')
|
||||
]);
|
||||
|
||||
// testUtils.testWithCluster('should move listeners when PubSub node disconnects from the cluster', async cluster => {
|
||||
// const listener = spy();
|
||||
// await cluster.subscribe('channel', listener);
|
||||
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
|
||||
// assert.ok(cluster.pubSubNode);
|
||||
// const [ migrating, importing ] = cluster.masters[0].address === cluster.pubSubNode.address ?
|
||||
// cluster.masters :
|
||||
// [cluster.masters[1], cluster.masters[0]],
|
||||
// [ migratingClient, importingClient ] = await Promise.all([
|
||||
// cluster.nodeClient(migrating),
|
||||
// cluster.nodeClient(importing)
|
||||
// ]);
|
||||
await cluster.pUnsubscribe('channe*', listener);
|
||||
|
||||
// const range = cluster.slots[0].master === migrating ? {
|
||||
// key: 'bar', // 5061
|
||||
// start: 0,
|
||||
// end: 8191
|
||||
// } : {
|
||||
// key: 'foo', // 12182
|
||||
// start: 8192,
|
||||
// end: 16383
|
||||
// };
|
||||
assert.equal(cluster.pubSubNode, undefined);
|
||||
}, GLOBAL.CLUSTERS.OPEN);
|
||||
|
||||
// await Promise.all([
|
||||
// migratingClient.clusterDelSlotsRange(range),
|
||||
// importingClient.clusterDelSlotsRange(range),
|
||||
// importingClient.clusterAddSlotsRange(range)
|
||||
// ]);
|
||||
// testUtils.testWithCluster('should move listeners when PubSub node disconnects from the cluster', async cluster => {
|
||||
// const listener = spy();
|
||||
// await cluster.subscribe('channel', listener);
|
||||
|
||||
// // wait for migrating node to be notified about the new topology
|
||||
// while ((await migratingClient.clusterInfo()).state !== 'ok') {
|
||||
// await setTimeout(50);
|
||||
// }
|
||||
// assert.ok(cluster.pubSubNode);
|
||||
// const [migrating, importing] = cluster.masters[0].address === cluster.pubSubNode.address ?
|
||||
// cluster.masters :
|
||||
// [cluster.masters[1], cluster.masters[0]],
|
||||
// [migratingClient, importingClient] = await Promise.all([
|
||||
// cluster.nodeClient(migrating),
|
||||
// cluster.nodeClient(importing)
|
||||
// ]);
|
||||
|
||||
// // make sure to cause `MOVED` error
|
||||
// await cluster.get(range.key);
|
||||
// const range = cluster.slots[0].master === migrating ? {
|
||||
// key: 'bar', // 5061
|
||||
// start: 0,
|
||||
// end: 8191
|
||||
// } : {
|
||||
// key: 'foo', // 12182
|
||||
// start: 8192,
|
||||
// end: 16383
|
||||
// };
|
||||
|
||||
// await Promise.all([
|
||||
// cluster.publish('channel', 'message'),
|
||||
// waitTillBeenCalled(listener)
|
||||
// ]);
|
||||
|
||||
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
// }, {
|
||||
// serverArguments: [],
|
||||
// numberOfMasters: 2,
|
||||
// minimumDockerVersion: [7]
|
||||
// });
|
||||
// await Promise.all([
|
||||
// migratingClient.clusterDelSlotsRange(range),
|
||||
// importingClient.clusterDelSlotsRange(range),
|
||||
// importingClient.clusterAddSlotsRange(range)
|
||||
// ]);
|
||||
|
||||
// testUtils.testWithCluster('ssubscribe & sunsubscribe', async cluster => {
|
||||
// const listener = spy();
|
||||
// // wait for migrating node to be notified about the new topology
|
||||
// while ((await migratingClient.clusterInfo()).state !== 'ok') {
|
||||
// await setTimeout(50);
|
||||
// }
|
||||
|
||||
// await cluster.sSubscribe('channel', listener);
|
||||
// // make sure to cause `MOVED` error
|
||||
// await cluster.get(range.key);
|
||||
|
||||
// await Promise.all([
|
||||
// waitTillBeenCalled(listener),
|
||||
// cluster.sPublish('channel', 'message')
|
||||
// ]);
|
||||
|
||||
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
// await Promise.all([
|
||||
// cluster.publish('channel', 'message'),
|
||||
// waitTillBeenCalled(listener)
|
||||
// ]);
|
||||
|
||||
// await cluster.sUnsubscribe('channel', listener);
|
||||
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
// }, {
|
||||
// serverArguments: [],
|
||||
// numberOfMasters: 2,
|
||||
// minimumDockerVersion: [7]
|
||||
// });
|
||||
|
||||
// // 10328 is the slot of `channel`
|
||||
// assert.equal(cluster.slots[10328].master.pubSubClient, undefined);
|
||||
// }, {
|
||||
// ...GLOBAL.CLUSTERS.OPEN,
|
||||
// minimumDockerVersion: [7]
|
||||
// });
|
||||
testUtils.testWithCluster('ssubscribe & sunsubscribe', async cluster => {
|
||||
const listener = spy();
|
||||
|
||||
// testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => {
|
||||
// const SLOT = 10328,
|
||||
// migrating = cluster.slots[SLOT].master,
|
||||
// importing = cluster.masters.find(master => master !== migrating)!,
|
||||
// [ migratingClient, importingClient ] = await Promise.all([
|
||||
// cluster.nodeClient(migrating),
|
||||
// cluster.nodeClient(importing)
|
||||
// ]);
|
||||
await cluster.sSubscribe('channel', listener);
|
||||
|
||||
// await Promise.all([
|
||||
// migratingClient.clusterDelSlots(SLOT),
|
||||
// importingClient.clusterDelSlots(SLOT),
|
||||
// importingClient.clusterAddSlots(SLOT)
|
||||
// ]);
|
||||
await Promise.all([
|
||||
waitTillBeenCalled(listener),
|
||||
cluster.sPublish('channel', 'message')
|
||||
]);
|
||||
|
||||
// // wait for migrating node to be notified about the new topology
|
||||
// while ((await migratingClient.clusterInfo()).state !== 'ok') {
|
||||
// await setTimeout(50);
|
||||
// }
|
||||
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
|
||||
// const listener = spy();
|
||||
await cluster.sUnsubscribe('channel', listener);
|
||||
|
||||
// // will trigger `MOVED` error
|
||||
// await cluster.sSubscribe('channel', listener);
|
||||
// 10328 is the slot of `channel`
|
||||
assert.equal(cluster.slots[10328].master.pubSubClient, undefined);
|
||||
}, {
|
||||
...GLOBAL.CLUSTERS.OPEN,
|
||||
minimumDockerVersion: [7]
|
||||
});
|
||||
|
||||
// await Promise.all([
|
||||
// waitTillBeenCalled(listener),
|
||||
// cluster.sPublish('channel', 'message')
|
||||
// ]);
|
||||
|
||||
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
// }, {
|
||||
// serverArguments: [],
|
||||
// minimumDockerVersion: [7]
|
||||
// });
|
||||
// });
|
||||
// });
|
||||
// testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => {
|
||||
// const SLOT = 10328,
|
||||
// migrating = cluster.slots[SLOT].master,
|
||||
// importing = cluster.masters.find(master => master !== migrating)!,
|
||||
// [migratingClient, importingClient] = await Promise.all([
|
||||
// cluster.nodeClient(migrating),
|
||||
// cluster.nodeClient(importing)
|
||||
// ]);
|
||||
|
||||
// await Promise.all([
|
||||
// migratingClient.clusterDelSlots(SLOT),
|
||||
// importingClient.clusterDelSlots(SLOT),
|
||||
// importingClient.clusterAddSlots(SLOT)
|
||||
// ]);
|
||||
|
||||
// // wait for migrating node to be notified about the new topology
|
||||
// while ((await migratingClient.clusterInfo()).state !== 'ok') {
|
||||
// await setTimeout(50);
|
||||
// }
|
||||
|
||||
// const listener = spy();
|
||||
|
||||
// // will trigger `MOVED` error
|
||||
// await cluster.sSubscribe('channel', listener);
|
||||
|
||||
// await Promise.all([
|
||||
// waitTillBeenCalled(listener),
|
||||
// cluster.sPublish('channel', 'message')
|
||||
// ]);
|
||||
|
||||
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
// }, {
|
||||
// serverArguments: [],
|
||||
// minimumDockerVersion: [7]
|
||||
// });
|
||||
});
|
||||
});
|
||||
|
@@ -1,6 +1,6 @@
|
||||
import { RedisClientOptions, RedisClientType } from '../client';
|
||||
import { CommandOptions } from '../client/commands-queue';
|
||||
import { Command, CommandArguments, CommanderConfig, CommandPolicies, CommandWithPoliciesSignature, TypeMapping, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions } from '../RESP/types';
|
||||
import { Command, CommandArguments, CommanderConfig, CommandSignature, /*CommandPolicies, CommandWithPoliciesSignature,*/ TypeMapping, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions } from '../RESP/types';
|
||||
import COMMANDS from '../commands';
|
||||
import { EventEmitter } from 'node:events';
|
||||
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
|
||||
@@ -65,12 +65,48 @@ export interface RedisClusterOptions<
|
||||
nodeAddressMap?: NodeAddressMap;
|
||||
}
|
||||
|
||||
// remove once request & response policies are ready
|
||||
type ClusterCommand<
|
||||
NAME extends PropertyKey,
|
||||
COMMAND extends Command
|
||||
> = COMMAND['FIRST_KEY_INDEX'] extends undefined ? (
|
||||
COMMAND['IS_FORWARD_COMMAND'] extends true ? NAME : never
|
||||
) : NAME;
|
||||
|
||||
// CommandWithPoliciesSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING, POLICIES>
|
||||
type WithCommands<
|
||||
RESP extends RespVersions,
|
||||
TYPE_MAPPING extends TypeMapping,
|
||||
// POLICIES extends CommandPolicies
|
||||
TYPE_MAPPING extends TypeMapping
|
||||
> = {
|
||||
[P in keyof typeof COMMANDS]: CommandWithPoliciesSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING, POLICIES>;
|
||||
[P in keyof typeof COMMANDS as ClusterCommand<P, (typeof COMMANDS)[P]>]: CommandSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING>;
|
||||
};
|
||||
|
||||
type WithModules<
|
||||
M extends RedisModules,
|
||||
RESP extends RespVersions,
|
||||
TYPE_MAPPING extends TypeMapping
|
||||
> = {
|
||||
[P in keyof M]: {
|
||||
[C in keyof M[P] as ClusterCommand<C, M[P][C]>]: CommandSignature<M[P][C], RESP, TYPE_MAPPING>;
|
||||
};
|
||||
};
|
||||
|
||||
type WithFunctions<
|
||||
F extends RedisFunctions,
|
||||
RESP extends RespVersions,
|
||||
TYPE_MAPPING extends TypeMapping
|
||||
> = {
|
||||
[L in keyof F]: {
|
||||
[C in keyof F[L] as ClusterCommand<C, F[L][C]>]: CommandSignature<F[L][C], RESP, TYPE_MAPPING>;
|
||||
};
|
||||
};
|
||||
|
||||
type WithScripts<
|
||||
S extends RedisScripts,
|
||||
RESP extends RespVersions,
|
||||
TYPE_MAPPING extends TypeMapping
|
||||
> = {
|
||||
[P in keyof S as ClusterCommand<P, S[P]>]: CommandSignature<S[P], RESP, TYPE_MAPPING>;
|
||||
};
|
||||
|
||||
export type RedisClusterType<
|
||||
@@ -80,17 +116,22 @@ export type RedisClusterType<
|
||||
RESP extends RespVersions = 2,
|
||||
TYPE_MAPPING extends TypeMapping = {},
|
||||
// POLICIES extends CommandPolicies = {}
|
||||
> = RedisCluster<M, F, S, RESP, TYPE_MAPPING, POLICIES> & WithCommands<RESP, TYPE_MAPPING/*, POLICIES*/>;
|
||||
// & WithModules<M> & WithFunctions<F> & WithScripts<S>
|
||||
> = (
|
||||
RedisCluster<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/> &
|
||||
WithCommands<RESP, TYPE_MAPPING> &
|
||||
WithModules<M, RESP, TYPE_MAPPING> &
|
||||
WithFunctions<F, RESP, TYPE_MAPPING> &
|
||||
WithScripts<S, RESP, TYPE_MAPPING>
|
||||
);
|
||||
|
||||
export interface ClusterCommandOptions<
|
||||
TYPE_MAPPING extends TypeMapping = TypeMapping,
|
||||
POLICIES extends CommandPolicies = CommandPolicies
|
||||
TYPE_MAPPING extends TypeMapping = TypeMapping
|
||||
// POLICIES extends CommandPolicies = CommandPolicies
|
||||
> extends CommandOptions<TYPE_MAPPING> {
|
||||
policies?: POLICIES;
|
||||
// policies?: POLICIES;
|
||||
}
|
||||
|
||||
type ProxyCluster = RedisCluster<any, any, any, any, any, any>;
|
||||
type ProxyCluster = RedisCluster<any, any, any, any, any/*, any*/>;
|
||||
|
||||
type NamespaceProxyCluster = { self: ProxyCluster };
|
||||
|
||||
@@ -100,20 +141,30 @@ export default class RedisCluster<
|
||||
S extends RedisScripts,
|
||||
RESP extends RespVersions,
|
||||
TYPE_MAPPING extends TypeMapping,
|
||||
POLICIES extends CommandPolicies
|
||||
// POLICIES extends CommandPolicies
|
||||
> extends EventEmitter {
|
||||
static extractFirstKey<C extends Command>(
|
||||
command: C,
|
||||
args: Parameters<C['transformArguments']>,
|
||||
redisArgs: Array<RedisArgument>
|
||||
): RedisArgument | undefined {
|
||||
if (command.FIRST_KEY_INDEX === undefined) {
|
||||
return undefined;
|
||||
} else if (typeof command.FIRST_KEY_INDEX === 'number') {
|
||||
return redisArgs[command.FIRST_KEY_INDEX];
|
||||
) {
|
||||
let key: RedisArgument | undefined;
|
||||
switch (typeof command.FIRST_KEY_INDEX) {
|
||||
case 'number':
|
||||
key = redisArgs[command.FIRST_KEY_INDEX];
|
||||
break;
|
||||
|
||||
case 'function':
|
||||
key = command.FIRST_KEY_INDEX(...args);
|
||||
break;
|
||||
}
|
||||
|
||||
return command.FIRST_KEY_INDEX(...args);
|
||||
// TODO: remove once request & response policies are ready
|
||||
if (key === undefined && !command.IS_FORWARD_COMMAND) {
|
||||
throw new Error('TODO');
|
||||
}
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
private static _createCommand(command: Command, resp: RespVersions) {
|
||||
@@ -130,7 +181,7 @@ export default class RedisCluster<
|
||||
command.IS_READ_ONLY,
|
||||
redisArgs,
|
||||
this._commandOptions,
|
||||
command.POLICIES
|
||||
// command.POLICIES
|
||||
);
|
||||
|
||||
return transformReply ?
|
||||
@@ -153,7 +204,7 @@ export default class RedisCluster<
|
||||
command.IS_READ_ONLY,
|
||||
redisArgs,
|
||||
this.self._commandOptions,
|
||||
command.POLICIES
|
||||
// command.POLICIES
|
||||
);
|
||||
|
||||
return transformReply ?
|
||||
@@ -167,18 +218,18 @@ export default class RedisCluster<
|
||||
transformReply = getTransformReply(fn, resp);
|
||||
return async function (this: NamespaceProxyCluster, ...args: Array<unknown>) {
|
||||
const fnArgs = fn.transformArguments(...args),
|
||||
redisArgs = prefix.concat(fnArgs),
|
||||
firstKey = RedisCluster.extractFirstKey(
|
||||
fn,
|
||||
fnArgs,
|
||||
redisArgs
|
||||
args,
|
||||
fnArgs
|
||||
),
|
||||
redisArgs = prefix.concat(fnArgs),
|
||||
reply = await this.self.sendCommand(
|
||||
firstKey,
|
||||
fn.IS_READ_ONLY,
|
||||
redisArgs,
|
||||
this.self._commandOptions,
|
||||
fn.POLICIES
|
||||
// fn.POLICIES
|
||||
);
|
||||
|
||||
return transformReply ?
|
||||
@@ -192,18 +243,19 @@ export default class RedisCluster<
|
||||
transformReply = getTransformReply(script, resp);
|
||||
return async function (this: ProxyCluster, ...args: Array<unknown>) {
|
||||
const scriptArgs = script.transformArguments(...args),
|
||||
redisArgs = prefix.concat(scriptArgs),
|
||||
firstKey = RedisCluster.extractFirstKey(
|
||||
script,
|
||||
scriptArgs,
|
||||
redisArgs
|
||||
args,
|
||||
scriptArgs
|
||||
),
|
||||
reply = await this.sendCommand(
|
||||
redisArgs = prefix.concat(scriptArgs),
|
||||
reply = await this.executeScript(
|
||||
script,
|
||||
firstKey,
|
||||
script.IS_READ_ONLY,
|
||||
redisArgs,
|
||||
this._commandOptions,
|
||||
script.POLICIES
|
||||
// script.POLICIES
|
||||
);
|
||||
|
||||
return transformReply ?
|
||||
@@ -218,8 +270,8 @@ export default class RedisCluster<
|
||||
S extends RedisScripts = {},
|
||||
RESP extends RespVersions = 2,
|
||||
TYPE_MAPPING extends TypeMapping = {},
|
||||
POLICIES extends CommandPolicies = {}
|
||||
>(config?: ClusterCommander<M, F, S, RESP, TYPE_MAPPING, POLICIES>) {
|
||||
// POLICIES extends CommandPolicies = {}
|
||||
>(config?: ClusterCommander<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>) {
|
||||
const Cluster = attachConfig({
|
||||
BaseClass: RedisCluster,
|
||||
commands: COMMANDS,
|
||||
@@ -234,7 +286,7 @@ export default class RedisCluster<
|
||||
|
||||
return (options?: Omit<RedisClusterOptions, keyof Exclude<typeof config, undefined>>) => {
|
||||
// returning a "proxy" to prevent the namespaces.self to leak between "proxies"
|
||||
return Object.create(new Cluster(options)) as RedisClusterType<M, F, S, RESP, TYPE_MAPPING, POLICIES>;
|
||||
return Object.create(new Cluster(options)) as RedisClusterType<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -244,16 +296,16 @@ export default class RedisCluster<
|
||||
S extends RedisScripts = {},
|
||||
RESP extends RespVersions = 2,
|
||||
TYPE_MAPPING extends TypeMapping = {},
|
||||
POLICIES extends CommandPolicies = {}
|
||||
>(options?: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING, POLICIES>) {
|
||||
// POLICIES extends CommandPolicies = {}
|
||||
>(options?: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>) {
|
||||
return RedisCluster.factory(options)(options);
|
||||
}
|
||||
|
||||
private readonly _options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING, POLICIES>;
|
||||
private readonly _options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>;
|
||||
|
||||
private readonly _slots: RedisClusterSlots<M, F, S, RESP>;
|
||||
|
||||
private _commandOptions?: ClusterCommandOptions<TYPE_MAPPING, POLICIES>;
|
||||
private _commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>;
|
||||
|
||||
/**
|
||||
* An array of the cluster slots, each slot contain its `master` and `replicas`.
|
||||
@@ -306,7 +358,7 @@ export default class RedisCluster<
|
||||
return this._slots.isOpen;
|
||||
}
|
||||
|
||||
constructor(options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING, POLICIES>) {
|
||||
constructor(options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>) {
|
||||
super();
|
||||
|
||||
this._options = options;
|
||||
@@ -336,9 +388,9 @@ export default class RedisCluster<
|
||||
}
|
||||
|
||||
withCommandOptions<
|
||||
OPTIONS extends ClusterCommandOptions<TYPE_MAPPING, CommandPolicies>,
|
||||
OPTIONS extends ClusterCommandOptions<TYPE_MAPPING/*, CommandPolicies*/>,
|
||||
TYPE_MAPPING extends TypeMapping,
|
||||
POLICIES extends CommandPolicies
|
||||
// POLICIES extends CommandPolicies
|
||||
>(options: OPTIONS) {
|
||||
const proxy = Object.create(this);
|
||||
proxy._commandOptions = options;
|
||||
@@ -347,8 +399,8 @@ export default class RedisCluster<
|
||||
F,
|
||||
S,
|
||||
RESP,
|
||||
TYPE_MAPPING extends TypeMapping ? TYPE_MAPPING : {},
|
||||
POLICIES extends CommandPolicies ? POLICIES : {}
|
||||
TYPE_MAPPING extends TypeMapping ? TYPE_MAPPING : {}
|
||||
// POLICIES extends CommandPolicies ? POLICIES : {}
|
||||
>;
|
||||
}
|
||||
|
||||
@@ -367,8 +419,8 @@ export default class RedisCluster<
|
||||
F,
|
||||
S,
|
||||
RESP,
|
||||
K extends 'typeMapping' ? V extends TypeMapping ? V : {} : TYPE_MAPPING,
|
||||
K extends 'policies' ? V extends CommandPolicies ? V : {} : POLICIES
|
||||
K extends 'typeMapping' ? V extends TypeMapping ? V : {} : TYPE_MAPPING
|
||||
// K extends 'policies' ? V extends CommandPolicies ? V : {} : POLICIES
|
||||
>;
|
||||
}
|
||||
|
||||
@@ -379,15 +431,15 @@ export default class RedisCluster<
|
||||
return this._commandOptionsProxy('typeMapping', typeMapping);
|
||||
}
|
||||
|
||||
/**
|
||||
* Override the `policies` command option
|
||||
* TODO
|
||||
*/
|
||||
withPolicies<POLICIES extends CommandPolicies> (policies: POLICIES) {
|
||||
return this._commandOptionsProxy('policies', policies);
|
||||
}
|
||||
// /**
|
||||
// * Override the `policies` command option
|
||||
// * TODO
|
||||
// */
|
||||
// withPolicies<POLICIES extends CommandPolicies> (policies: POLICIES) {
|
||||
// return this._commandOptionsProxy('policies', policies);
|
||||
// }
|
||||
|
||||
async #execute<T>(
|
||||
private async _execute<T>(
|
||||
firstKey: RedisArgument | undefined,
|
||||
isReadonly: boolean | undefined,
|
||||
fn: (client: RedisClientType<M, F, S, RESP>) => Promise<T>
|
||||
@@ -437,9 +489,9 @@ export default class RedisCluster<
|
||||
isReadonly: boolean | undefined,
|
||||
args: CommandArguments,
|
||||
options?: ClusterCommandOptions,
|
||||
defaultPolicies?: CommandPolicies
|
||||
// defaultPolicies?: CommandPolicies
|
||||
): Promise<T> {
|
||||
return this.#execute(
|
||||
return this._execute(
|
||||
firstKey,
|
||||
isReadonly,
|
||||
client => client.sendCommand(args, options)
|
||||
@@ -453,7 +505,7 @@ export default class RedisCluster<
|
||||
args: Array<RedisArgument>,
|
||||
options?: CommandOptions
|
||||
) {
|
||||
return this.#execute(
|
||||
return this._execute(
|
||||
firstKey,
|
||||
isReadonly,
|
||||
client => client.executeScript(script, args, options)
|
||||
|
Reference in New Issue
Block a user