1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00
This commit is contained in:
Leibale
2023-04-26 18:04:27 -04:00
parent c78f3354e0
commit 1254ff7ebd
5 changed files with 1343 additions and 1343 deletions

View File

@@ -1,195 +1,195 @@
import { strict as assert } from 'assert';
import { SinonSpy, spy } from 'sinon';
import RESP2Decoder from './decoder';
import { ErrorReply } from '../../errors';
// import { strict as assert } from 'assert';
// import { SinonSpy, spy } from 'sinon';
// import RESP2Decoder from './decoder';
// import { ErrorReply } from '../../errors';
interface DecoderAndSpies {
decoder: RESP2Decoder;
returnStringsAsBuffersSpy: SinonSpy;
onReplySpy: SinonSpy;
}
// interface DecoderAndSpies {
// decoder: RESP2Decoder;
// returnStringsAsBuffersSpy: SinonSpy;
// onReplySpy: SinonSpy;
// }
function createDecoderAndSpies(returnStringsAsBuffers: boolean): DecoderAndSpies {
const returnStringsAsBuffersSpy = spy(() => returnStringsAsBuffers),
onReplySpy = spy();
// function createDecoderAndSpies(returnStringsAsBuffers: boolean): DecoderAndSpies {
// const returnStringsAsBuffersSpy = spy(() => returnStringsAsBuffers),
// onReplySpy = spy();
return {
decoder: new RESP2Decoder({
returnStringsAsBuffers: returnStringsAsBuffersSpy,
onReply: onReplySpy
}),
returnStringsAsBuffersSpy,
onReplySpy
};
}
// return {
// decoder: new RESP2Decoder({
// returnStringsAsBuffers: returnStringsAsBuffersSpy,
// onReply: onReplySpy
// }),
// returnStringsAsBuffersSpy,
// onReplySpy
// };
// }
function writeChunks(stream: RESP2Decoder, buffer: Buffer) {
let i = 0;
while (i < buffer.length) {
stream.write(buffer.slice(i, ++i));
}
}
// function writeChunks(stream: RESP2Decoder, buffer: Buffer) {
// let i = 0;
// while (i < buffer.length) {
// stream.write(buffer.slice(i, ++i));
// }
// }
type Replies = Array<Array<unknown>>;
// type Replies = Array<Array<unknown>>;
interface TestsOptions {
toWrite: Buffer;
returnStringsAsBuffers: boolean;
replies: Replies;
}
// interface TestsOptions {
// toWrite: Buffer;
// returnStringsAsBuffers: boolean;
// replies: Replies;
// }
function generateTests({
toWrite,
returnStringsAsBuffers,
replies
}: TestsOptions): void {
it('single chunk', () => {
const { decoder, returnStringsAsBuffersSpy, onReplySpy } =
createDecoderAndSpies(returnStringsAsBuffers);
decoder.write(toWrite);
assert.equal(returnStringsAsBuffersSpy.callCount, replies.length);
testReplies(onReplySpy, replies);
});
// function generateTests({
// toWrite,
// returnStringsAsBuffers,
// replies
// }: TestsOptions): void {
// it('single chunk', () => {
// const { decoder, returnStringsAsBuffersSpy, onReplySpy } =
// createDecoderAndSpies(returnStringsAsBuffers);
// decoder.write(toWrite);
// assert.equal(returnStringsAsBuffersSpy.callCount, replies.length);
// testReplies(onReplySpy, replies);
// });
it('multiple chunks', () => {
const { decoder, returnStringsAsBuffersSpy, onReplySpy } =
createDecoderAndSpies(returnStringsAsBuffers);
writeChunks(decoder, toWrite);
assert.equal(returnStringsAsBuffersSpy.callCount, replies.length);
testReplies(onReplySpy, replies);
});
}
// it('multiple chunks', () => {
// const { decoder, returnStringsAsBuffersSpy, onReplySpy } =
// createDecoderAndSpies(returnStringsAsBuffers);
// writeChunks(decoder, toWrite);
// assert.equal(returnStringsAsBuffersSpy.callCount, replies.length);
// testReplies(onReplySpy, replies);
// });
// }
function testReplies(spy: SinonSpy, replies: Replies): void {
if (!replies) {
assert.equal(spy.callCount, 0);
return;
}
// function testReplies(spy: SinonSpy, replies: Replies): void {
// if (!replies) {
// assert.equal(spy.callCount, 0);
// return;
// }
assert.equal(spy.callCount, replies.length);
for (const [i, reply] of replies.entries()) {
assert.deepEqual(
spy.getCall(i).args,
reply
);
}
}
// assert.equal(spy.callCount, replies.length);
// for (const [i, reply] of replies.entries()) {
// assert.deepEqual(
// spy.getCall(i).args,
// reply
// );
// }
// }
describe('RESP2Parser', () => {
describe('Simple String', () => {
describe('as strings', () => {
generateTests({
toWrite: Buffer.from('+OK\r\n'),
returnStringsAsBuffers: false,
replies: [['OK']]
});
});
// describe('RESP2Parser', () => {
// describe('Simple String', () => {
// describe('as strings', () => {
// generateTests({
// toWrite: Buffer.from('+OK\r\n'),
// returnStringsAsBuffers: false,
// replies: [['OK']]
// });
// });
describe('as buffers', () => {
generateTests({
toWrite: Buffer.from('+OK\r\n'),
returnStringsAsBuffers: true,
replies: [[Buffer.from('OK')]]
});
});
});
// describe('as buffers', () => {
// generateTests({
// toWrite: Buffer.from('+OK\r\n'),
// returnStringsAsBuffers: true,
// replies: [[Buffer.from('OK')]]
// });
// });
// });
describe('Error', () => {
generateTests({
toWrite: Buffer.from('-ERR\r\n'),
returnStringsAsBuffers: false,
replies: [[new ErrorReply('ERR')]]
});
});
// describe('Error', () => {
// generateTests({
// toWrite: Buffer.from('-ERR\r\n'),
// returnStringsAsBuffers: false,
// replies: [[new ErrorReply('ERR')]]
// });
// });
describe('Integer', () => {
describe('-1', () => {
generateTests({
toWrite: Buffer.from(':-1\r\n'),
returnStringsAsBuffers: false,
replies: [[-1]]
});
});
// describe('Integer', () => {
// describe('-1', () => {
// generateTests({
// toWrite: Buffer.from(':-1\r\n'),
// returnStringsAsBuffers: false,
// replies: [[-1]]
// });
// });
describe('0', () => {
generateTests({
toWrite: Buffer.from(':0\r\n'),
returnStringsAsBuffers: false,
replies: [[0]]
});
});
});
// describe('0', () => {
// generateTests({
// toWrite: Buffer.from(':0\r\n'),
// returnStringsAsBuffers: false,
// replies: [[0]]
// });
// });
// });
describe('Bulk String', () => {
describe('null', () => {
generateTests({
toWrite: Buffer.from('$-1\r\n'),
returnStringsAsBuffers: false,
replies: [[null]]
});
});
// describe('Bulk String', () => {
// describe('null', () => {
// generateTests({
// toWrite: Buffer.from('$-1\r\n'),
// returnStringsAsBuffers: false,
// replies: [[null]]
// });
// });
describe('as strings', () => {
generateTests({
toWrite: Buffer.from('$2\r\naa\r\n'),
returnStringsAsBuffers: false,
replies: [['aa']]
});
});
// describe('as strings', () => {
// generateTests({
// toWrite: Buffer.from('$2\r\naa\r\n'),
// returnStringsAsBuffers: false,
// replies: [['aa']]
// });
// });
describe('as buffers', () => {
generateTests({
toWrite: Buffer.from('$2\r\naa\r\n'),
returnStringsAsBuffers: true,
replies: [[Buffer.from('aa')]]
});
});
});
// describe('as buffers', () => {
// generateTests({
// toWrite: Buffer.from('$2\r\naa\r\n'),
// returnStringsAsBuffers: true,
// replies: [[Buffer.from('aa')]]
// });
// });
// });
describe('Array', () => {
describe('null', () => {
generateTests({
toWrite: Buffer.from('*-1\r\n'),
returnStringsAsBuffers: false,
replies: [[null]]
});
});
// describe('Array', () => {
// describe('null', () => {
// generateTests({
// toWrite: Buffer.from('*-1\r\n'),
// returnStringsAsBuffers: false,
// replies: [[null]]
// });
// });
const arrayBuffer = Buffer.from(
'*5\r\n' +
'+OK\r\n' +
'-ERR\r\n' +
':0\r\n' +
'$1\r\na\r\n' +
'*0\r\n'
);
// const arrayBuffer = Buffer.from(
// '*5\r\n' +
// '+OK\r\n' +
// '-ERR\r\n' +
// ':0\r\n' +
// '$1\r\na\r\n' +
// '*0\r\n'
// );
describe('as strings', () => {
generateTests({
toWrite: arrayBuffer,
returnStringsAsBuffers: false,
replies: [[[
'OK',
new ErrorReply('ERR'),
0,
'a',
[]
]]]
});
});
// describe('as strings', () => {
// generateTests({
// toWrite: arrayBuffer,
// returnStringsAsBuffers: false,
// replies: [[[
// 'OK',
// new ErrorReply('ERR'),
// 0,
// 'a',
// []
// ]]]
// });
// });
describe('as buffers', () => {
generateTests({
toWrite: arrayBuffer,
returnStringsAsBuffers: true,
replies: [[[
Buffer.from('OK'),
new ErrorReply('ERR'),
0,
Buffer.from('a'),
[]
]]]
});
});
});
});
// describe('as buffers', () => {
// generateTests({
// toWrite: arrayBuffer,
// returnStringsAsBuffers: true,
// replies: [[[
// Buffer.from('OK'),
// new ErrorReply('ERR'),
// 0,
// Buffer.from('a'),
// []
// ]]]
// });
// });
// });
// });

File diff suppressed because it is too large Load Diff

View File

@@ -1,362 +1,362 @@
import { strict as assert } from 'assert';
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
import RedisCluster from '.';
import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT';
import { commandOptions } from '../command-options';
import { SQUARE_SCRIPT } from '../client/index.spec';
import { RootNodesUnavailableError } from '../errors';
import { spy } from 'sinon';
import { promiseTimeout } from '../utils';
import RedisClient from '../client';
// import { strict as assert } from 'assert';
// import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
// import RedisCluster from '.';
// import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT';
// import { commandOptions } from '../command-options';
// import { SQUARE_SCRIPT } from '../client/index.spec';
// import { RootNodesUnavailableError } from '../errors';
// import { spy } from 'sinon';
// import { promiseTimeout } from '../utils';
// import RedisClient from '../client';
describe('Cluster', () => {
testUtils.testWithCluster('sendCommand', async cluster => {
assert.equal(
await cluster.sendCommand(undefined, true, ['PING']),
'PONG'
);
}, GLOBAL.CLUSTERS.OPEN);
// describe('Cluster', () => {
// testUtils.testWithCluster('sendCommand', async cluster => {
// assert.equal(
// await cluster.sendCommand(undefined, true, ['PING']),
// 'PONG'
// );
// }, GLOBAL.CLUSTERS.OPEN);
testUtils.testWithCluster('isOpen', async cluster => {
assert.equal(cluster.isOpen, true);
await cluster.disconnect();
assert.equal(cluster.isOpen, false);
}, GLOBAL.CLUSTERS.OPEN);
// testUtils.testWithCluster('isOpen', async cluster => {
// assert.equal(cluster.isOpen, true);
// await cluster.disconnect();
// assert.equal(cluster.isOpen, false);
// }, GLOBAL.CLUSTERS.OPEN);
testUtils.testWithCluster('connect should throw if already connected', async cluster => {
await assert.rejects(cluster.connect());
}, GLOBAL.CLUSTERS.OPEN);
// testUtils.testWithCluster('connect should throw if already connected', async cluster => {
// await assert.rejects(cluster.connect());
// }, GLOBAL.CLUSTERS.OPEN);
testUtils.testWithCluster('multi', async cluster => {
const key = 'key';
assert.deepEqual(
await cluster.multi()
.set(key, 'value')
.get(key)
.exec(),
['OK', 'value']
);
}, GLOBAL.CLUSTERS.OPEN);
// testUtils.testWithCluster('multi', async cluster => {
// const key = 'key';
// assert.deepEqual(
// await cluster.multi()
// .set(key, 'value')
// .get(key)
// .exec(),
// ['OK', 'value']
// );
// }, GLOBAL.CLUSTERS.OPEN);
testUtils.testWithCluster('scripts', async cluster => {
assert.equal(
await cluster.square(2),
4
);
}, {
...GLOBAL.CLUSTERS.OPEN,
clusterConfiguration: {
scripts: {
square: SQUARE_SCRIPT
}
}
});
// testUtils.testWithCluster('scripts', async cluster => {
// assert.equal(
// await cluster.square(2),
// 4
// );
// }, {
// ...GLOBAL.CLUSTERS.OPEN,
// clusterConfiguration: {
// scripts: {
// square: SQUARE_SCRIPT
// }
// }
// });
it('should throw RootNodesUnavailableError', async () => {
const cluster = RedisCluster.create({
rootNodes: []
});
// it('should throw RootNodesUnavailableError', async () => {
// const cluster = RedisCluster.create({
// rootNodes: []
// });
try {
await assert.rejects(
cluster.connect(),
RootNodesUnavailableError
);
} catch (err) {
await cluster.disconnect();
throw err;
}
});
// try {
// await assert.rejects(
// cluster.connect(),
// RootNodesUnavailableError
// );
// } catch (err) {
// await cluster.disconnect();
// throw err;
// }
// });
testUtils.testWithCluster('should handle live resharding', async cluster => {
const slot = 12539,
key = 'key',
value = 'value';
await cluster.set(key, value);
// testUtils.testWithCluster('should handle live resharding', async cluster => {
// const slot = 12539,
// key = 'key',
// value = 'value';
// await cluster.set(key, value);
const importing = cluster.slots[0].master,
migrating = cluster.slots[slot].master,
[ importingClient, migratingClient ] = await Promise.all([
cluster.nodeClient(importing),
cluster.nodeClient(migrating)
]);
// const importing = cluster.slots[0].master,
// migrating = cluster.slots[slot].master,
// [ importingClient, migratingClient ] = await Promise.all([
// cluster.nodeClient(importing),
// cluster.nodeClient(migrating)
// ]);
await Promise.all([
importingClient.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, migrating.id),
migratingClient.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, importing.id)
]);
// await Promise.all([
// importingClient.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, migrating.id),
// migratingClient.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, importing.id)
// ]);
// should be able to get the key from the migrating node
assert.equal(
await cluster.get(key),
value
);
// // should be able to get the key from the migrating node
// assert.equal(
// await cluster.get(key),
// value
// );
await migratingClient.migrate(
importing.host,
importing.port,
key,
0,
10
);
// await migratingClient.migrate(
// importing.host,
// importing.port,
// key,
// 0,
// 10
// );
// should be able to get the key from the importing node using `ASKING`
assert.equal(
await cluster.get(key),
value
);
// // should be able to get the key from the importing node using `ASKING`
// assert.equal(
// await cluster.get(key),
// value
// );
await Promise.all([
importingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id),
migratingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id),
]);
// await Promise.all([
// importingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id),
// migratingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id),
// ]);
// should handle `MOVED` errors
assert.equal(
await cluster.get(key),
value
);
}, {
serverArguments: [],
numberOfMasters: 2
});
// // should handle `MOVED` errors
// assert.equal(
// await cluster.get(key),
// value
// );
// }, {
// serverArguments: [],
// numberOfMasters: 2
// });
testUtils.testWithCluster('getRandomNode should spread the the load evenly', async cluster => {
const totalNodes = cluster.masters.length + cluster.replicas.length,
ids = new Set<string>();
for (let i = 0; i < totalNodes; i++) {
ids.add(cluster.getRandomNode().id);
}
// testUtils.testWithCluster('getRandomNode should spread the the load evenly', async cluster => {
// const totalNodes = cluster.masters.length + cluster.replicas.length,
// ids = new Set<string>();
// for (let i = 0; i < totalNodes; i++) {
// ids.add(cluster.getRandomNode().id);
// }
assert.equal(ids.size, totalNodes);
}, GLOBAL.CLUSTERS.WITH_REPLICAS);
// assert.equal(ids.size, totalNodes);
// }, GLOBAL.CLUSTERS.WITH_REPLICAS);
testUtils.testWithCluster('getSlotRandomNode should spread the the load evenly', async cluster => {
const totalNodes = 1 + cluster.slots[0].replicas!.length,
ids = new Set<string>();
for (let i = 0; i < totalNodes; i++) {
ids.add(cluster.getSlotRandomNode(0).id);
}
// testUtils.testWithCluster('getSlotRandomNode should spread the the load evenly', async cluster => {
// const totalNodes = 1 + cluster.slots[0].replicas!.length,
// ids = new Set<string>();
// for (let i = 0; i < totalNodes; i++) {
// ids.add(cluster.getSlotRandomNode(0).id);
// }
assert.equal(ids.size, totalNodes);
}, GLOBAL.CLUSTERS.WITH_REPLICAS);
// assert.equal(ids.size, totalNodes);
// }, GLOBAL.CLUSTERS.WITH_REPLICAS);
testUtils.testWithCluster('cluster topology', async cluster => {
assert.equal(cluster.slots.length, 16384);
const { numberOfMasters, numberOfReplicas } = GLOBAL.CLUSTERS.WITH_REPLICAS;
assert.equal(cluster.shards.length, numberOfMasters);
assert.equal(cluster.masters.length, numberOfMasters);
assert.equal(cluster.replicas.length, numberOfReplicas * numberOfMasters);
assert.equal(cluster.nodeByAddress.size, numberOfMasters + numberOfMasters * numberOfReplicas);
}, GLOBAL.CLUSTERS.WITH_REPLICAS);
// testUtils.testWithCluster('cluster topology', async cluster => {
// assert.equal(cluster.slots.length, 16384);
// const { numberOfMasters, numberOfReplicas } = GLOBAL.CLUSTERS.WITH_REPLICAS;
// assert.equal(cluster.shards.length, numberOfMasters);
// assert.equal(cluster.masters.length, numberOfMasters);
// assert.equal(cluster.replicas.length, numberOfReplicas * numberOfMasters);
// assert.equal(cluster.nodeByAddress.size, numberOfMasters + numberOfMasters * numberOfReplicas);
// }, GLOBAL.CLUSTERS.WITH_REPLICAS);
testUtils.testWithCluster('getMasters should be backwards competiable (without `minimizeConnections`)', async cluster => {
const masters = cluster.getMasters();
assert.ok(Array.isArray(masters));
for (const master of masters) {
assert.equal(typeof master.id, 'string');
assert.ok(master.client instanceof RedisClient);
}
}, {
...GLOBAL.CLUSTERS.OPEN,
clusterConfiguration: {
minimizeConnections: undefined // reset to default
}
});
// testUtils.testWithCluster('getMasters should be backwards competiable (without `minimizeConnections`)', async cluster => {
// const masters = cluster.getMasters();
// assert.ok(Array.isArray(masters));
// for (const master of masters) {
// assert.equal(typeof master.id, 'string');
// assert.ok(master.client instanceof RedisClient);
// }
// }, {
// ...GLOBAL.CLUSTERS.OPEN,
// clusterConfiguration: {
// minimizeConnections: undefined // reset to default
// }
// });
testUtils.testWithCluster('getSlotMaster should be backwards competiable (without `minimizeConnections`)', async cluster => {
const master = cluster.getSlotMaster(0);
assert.equal(typeof master.id, 'string');
assert.ok(master.client instanceof RedisClient);
}, {
...GLOBAL.CLUSTERS.OPEN,
clusterConfiguration: {
minimizeConnections: undefined // reset to default
}
});
// testUtils.testWithCluster('getSlotMaster should be backwards competiable (without `minimizeConnections`)', async cluster => {
// const master = cluster.getSlotMaster(0);
// assert.equal(typeof master.id, 'string');
// assert.ok(master.client instanceof RedisClient);
// }, {
// ...GLOBAL.CLUSTERS.OPEN,
// clusterConfiguration: {
// minimizeConnections: undefined // reset to default
// }
// });
testUtils.testWithCluster('should throw CROSSSLOT error', async cluster => {
await assert.rejects(cluster.mGet(['a', 'b']));
}, GLOBAL.CLUSTERS.OPEN);
// testUtils.testWithCluster('should throw CROSSSLOT error', async cluster => {
// await assert.rejects(cluster.mGet(['a', 'b']));
// }, GLOBAL.CLUSTERS.OPEN);
testUtils.testWithCluster('should send commands with commandOptions to correct cluster slot (without redirections)', async cluster => {
// 'a' and 'b' hash to different cluster slots (see previous unit test)
// -> maxCommandRedirections 0: rejects on MOVED/ASK reply
await cluster.set(commandOptions({ isolated: true }), 'a', '1'),
await cluster.set(commandOptions({ isolated: true }), 'b', '2'),
// testUtils.testWithCluster('should send commands with commandOptions to correct cluster slot (without redirections)', async cluster => {
// // 'a' and 'b' hash to different cluster slots (see previous unit test)
// // -> maxCommandRedirections 0: rejects on MOVED/ASK reply
// await cluster.set(commandOptions({ isolated: true }), 'a', '1'),
// await cluster.set(commandOptions({ isolated: true }), 'b', '2'),
assert.equal(await cluster.get('a'), '1');
assert.equal(await cluster.get('b'), '2');
}, {
...GLOBAL.CLUSTERS.OPEN,
clusterConfiguration: {
maxCommandRedirections: 0
}
});
// assert.equal(await cluster.get('a'), '1');
// assert.equal(await cluster.get('b'), '2');
// }, {
// ...GLOBAL.CLUSTERS.OPEN,
// clusterConfiguration: {
// maxCommandRedirections: 0
// }
// });
describe('minimizeConnections', () => {
testUtils.testWithCluster('false', async cluster => {
for (const master of cluster.masters) {
assert.ok(master.client instanceof RedisClient);
}
}, {
...GLOBAL.CLUSTERS.OPEN,
clusterConfiguration: {
minimizeConnections: false
}
});
// describe('minimizeConnections', () => {
// testUtils.testWithCluster('false', async cluster => {
// for (const master of cluster.masters) {
// assert.ok(master.client instanceof RedisClient);
// }
// }, {
// ...GLOBAL.CLUSTERS.OPEN,
// clusterConfiguration: {
// minimizeConnections: false
// }
// });
testUtils.testWithCluster('true', async cluster => {
for (const master of cluster.masters) {
assert.equal(master.client, undefined);
}
}, {
...GLOBAL.CLUSTERS.OPEN,
clusterConfiguration: {
minimizeConnections: true
}
});
});
// testUtils.testWithCluster('true', async cluster => {
// for (const master of cluster.masters) {
// assert.equal(master.client, undefined);
// }
// }, {
// ...GLOBAL.CLUSTERS.OPEN,
// clusterConfiguration: {
// minimizeConnections: true
// }
// });
// });
describe('PubSub', () => {
testUtils.testWithCluster('subscribe & unsubscribe', async cluster => {
const listener = spy();
// describe('PubSub', () => {
// testUtils.testWithCluster('subscribe & unsubscribe', async cluster => {
// const listener = spy();
await cluster.subscribe('channel', listener);
// await cluster.subscribe('channel', listener);
await Promise.all([
waitTillBeenCalled(listener),
cluster.publish('channel', 'message')
]);
// await Promise.all([
// waitTillBeenCalled(listener),
// cluster.publish('channel', 'message')
// ]);
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
await cluster.unsubscribe('channel', listener);
// await cluster.unsubscribe('channel', listener);
assert.equal(cluster.pubSubNode, undefined);
}, GLOBAL.CLUSTERS.OPEN);
// assert.equal(cluster.pubSubNode, undefined);
// }, GLOBAL.CLUSTERS.OPEN);
testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => {
const listener = spy();
// testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => {
// const listener = spy();
await cluster.pSubscribe('channe*', listener);
// await cluster.pSubscribe('channe*', listener);
await Promise.all([
waitTillBeenCalled(listener),
cluster.publish('channel', 'message')
]);
// await Promise.all([
// waitTillBeenCalled(listener),
// cluster.publish('channel', 'message')
// ]);
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
await cluster.pUnsubscribe('channe*', listener);
// await cluster.pUnsubscribe('channe*', listener);
assert.equal(cluster.pubSubNode, undefined);
}, GLOBAL.CLUSTERS.OPEN);
// assert.equal(cluster.pubSubNode, undefined);
// }, GLOBAL.CLUSTERS.OPEN);
testUtils.testWithCluster('should move listeners when PubSub node disconnects from the cluster', async cluster => {
const listener = spy();
await cluster.subscribe('channel', listener);
// testUtils.testWithCluster('should move listeners when PubSub node disconnects from the cluster', async cluster => {
// const listener = spy();
// await cluster.subscribe('channel', listener);
assert.ok(cluster.pubSubNode);
const [ migrating, importing ] = cluster.masters[0].address === cluster.pubSubNode.address ?
cluster.masters :
[cluster.masters[1], cluster.masters[0]],
[ migratingClient, importingClient ] = await Promise.all([
cluster.nodeClient(migrating),
cluster.nodeClient(importing)
]);
// assert.ok(cluster.pubSubNode);
// const [ migrating, importing ] = cluster.masters[0].address === cluster.pubSubNode.address ?
// cluster.masters :
// [cluster.masters[1], cluster.masters[0]],
// [ migratingClient, importingClient ] = await Promise.all([
// cluster.nodeClient(migrating),
// cluster.nodeClient(importing)
// ]);
const range = cluster.slots[0].master === migrating ? {
key: 'bar', // 5061
start: 0,
end: 8191
} : {
key: 'foo', // 12182
start: 8192,
end: 16383
};
// const range = cluster.slots[0].master === migrating ? {
// key: 'bar', // 5061
// start: 0,
// end: 8191
// } : {
// key: 'foo', // 12182
// start: 8192,
// end: 16383
// };
await Promise.all([
migratingClient.clusterDelSlotsRange(range),
importingClient.clusterDelSlotsRange(range),
importingClient.clusterAddSlotsRange(range)
]);
// await Promise.all([
// migratingClient.clusterDelSlotsRange(range),
// importingClient.clusterDelSlotsRange(range),
// importingClient.clusterAddSlotsRange(range)
// ]);
// wait for migrating node to be notified about the new topology
while ((await migratingClient.clusterInfo()).state !== 'ok') {
await promiseTimeout(50);
}
// // wait for migrating node to be notified about the new topology
// while ((await migratingClient.clusterInfo()).state !== 'ok') {
// await promiseTimeout(50);
// }
// make sure to cause `MOVED` error
await cluster.get(range.key);
// // make sure to cause `MOVED` error
// await cluster.get(range.key);
await Promise.all([
cluster.publish('channel', 'message'),
waitTillBeenCalled(listener)
]);
// await Promise.all([
// cluster.publish('channel', 'message'),
// waitTillBeenCalled(listener)
// ]);
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
}, {
serverArguments: [],
numberOfMasters: 2,
minimumDockerVersion: [7]
});
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
// }, {
// serverArguments: [],
// numberOfMasters: 2,
// minimumDockerVersion: [7]
// });
testUtils.testWithCluster('ssubscribe & sunsubscribe', async cluster => {
const listener = spy();
// testUtils.testWithCluster('ssubscribe & sunsubscribe', async cluster => {
// const listener = spy();
await cluster.sSubscribe('channel', listener);
// await cluster.sSubscribe('channel', listener);
await Promise.all([
waitTillBeenCalled(listener),
cluster.sPublish('channel', 'message')
]);
// await Promise.all([
// waitTillBeenCalled(listener),
// cluster.sPublish('channel', 'message')
// ]);
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
await cluster.sUnsubscribe('channel', listener);
// await cluster.sUnsubscribe('channel', listener);
// 10328 is the slot of `channel`
assert.equal(cluster.slots[10328].master.pubSubClient, undefined);
}, {
...GLOBAL.CLUSTERS.OPEN,
minimumDockerVersion: [7]
});
// // 10328 is the slot of `channel`
// assert.equal(cluster.slots[10328].master.pubSubClient, undefined);
// }, {
// ...GLOBAL.CLUSTERS.OPEN,
// minimumDockerVersion: [7]
// });
testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => {
const SLOT = 10328,
migrating = cluster.slots[SLOT].master,
importing = cluster.masters.find(master => master !== migrating)!,
[ migratingClient, importingClient ] = await Promise.all([
cluster.nodeClient(migrating),
cluster.nodeClient(importing)
]);
// testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => {
// const SLOT = 10328,
// migrating = cluster.slots[SLOT].master,
// importing = cluster.masters.find(master => master !== migrating)!,
// [ migratingClient, importingClient ] = await Promise.all([
// cluster.nodeClient(migrating),
// cluster.nodeClient(importing)
// ]);
await Promise.all([
migratingClient.clusterDelSlots(SLOT),
importingClient.clusterDelSlots(SLOT),
importingClient.clusterAddSlots(SLOT)
]);
// await Promise.all([
// migratingClient.clusterDelSlots(SLOT),
// importingClient.clusterDelSlots(SLOT),
// importingClient.clusterAddSlots(SLOT)
// ]);
// wait for migrating node to be notified about the new topology
while ((await migratingClient.clusterInfo()).state !== 'ok') {
await promiseTimeout(50);
}
// // wait for migrating node to be notified about the new topology
// while ((await migratingClient.clusterInfo()).state !== 'ok') {
// await promiseTimeout(50);
// }
const listener = spy();
// const listener = spy();
// will trigger `MOVED` error
await cluster.sSubscribe('channel', listener);
// // will trigger `MOVED` error
// await cluster.sSubscribe('channel', listener);
await Promise.all([
waitTillBeenCalled(listener),
cluster.sPublish('channel', 'message')
]);
// await Promise.all([
// waitTillBeenCalled(listener),
// cluster.sPublish('channel', 'message')
// ]);
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
}, {
serverArguments: [],
minimumDockerVersion: [7]
});
});
});
// assert.ok(listener.calledOnceWithExactly('message', 'channel'));
// }, {
// serverArguments: [],
// minimumDockerVersion: [7]
// });
// });
// });

View File

@@ -2,7 +2,7 @@ import { strict as assert } from 'assert';
import testUtils, { GLOBAL } from '../test-utils';
import APPEND from './APPEND';
describe.only('APPEND', () => {
describe('APPEND', () => {
it('transformArguments', () => {
assert.deepEqual(
APPEND.transformArguments('key', 'value'),

View File

@@ -2,7 +2,7 @@ import { strict as assert } from 'assert';
import testUtils, { GLOBAL } from '../test-utils';
import PING from './PING';
describe.only('PING', () => {
describe('PING', () => {
describe('transformArguments', () => {
it('default', () => {
assert.deepEqual(