You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
1296 lines
44 KiB
TypeScript
1296 lines
44 KiB
TypeScript
import { strict as assert } from 'node:assert';
|
|
import { setTimeout } from 'node:timers/promises';
|
|
import { WatchError } from "../errors";
|
|
import { RedisSentinelConfig, SentinelFramework } from "./test-util";
|
|
import { RedisNode, RedisSentinelClientType, RedisSentinelEvent, RedisSentinelType } from "./types";
|
|
import { RedisSentinelFactory } from '.';
|
|
import { RedisClientType } from '../client';
|
|
import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, NumberReply } from '../RESP/types';
|
|
|
|
import { promisify } from 'node:util';
|
|
import { exec } from 'node:child_process';
|
|
import { RESP_TYPES } from '../RESP/decoder';
|
|
import { defineScript } from '../lua-script';
|
|
import { MATH_FUNCTION } from '../commands/FUNCTION_LOAD.spec';
|
|
import RedisBloomModules from '@redis/bloom';
|
|
import { RedisTcpSocketOptions } from '../client/socket';
|
|
import { SQUARE_SCRIPT } from '../client/index.spec';
|
|
|
|
const execAsync = promisify(exec);
|
|
|
|
/* used to ensure test environment resets to normal state
|
|
i.e.
|
|
- all redis nodes are active and are part of the topology
|
|
before allowing things to continue.
|
|
*/
|
|
|
|
async function steadyState(frame: SentinelFramework) {
|
|
let checkedMaster = false;
|
|
let checkedReplicas = false;
|
|
while (!checkedMaster || !checkedReplicas) {
|
|
if (!checkedMaster) {
|
|
const master = await frame.sentinelMaster();
|
|
if (master?.flags === 'master') {
|
|
checkedMaster = true;
|
|
}
|
|
}
|
|
|
|
if (!checkedReplicas) {
|
|
const replicas = (await frame.sentinelReplicas());
|
|
checkedReplicas = true;
|
|
for (const replica of replicas!) {
|
|
checkedReplicas &&= (replica.flags === 'slave');
|
|
}
|
|
}
|
|
}
|
|
|
|
let nodeResolve, nodeReject;
|
|
const nodePromise = new Promise((res, rej) => {
|
|
nodeResolve = res;
|
|
nodeReject = rej;
|
|
})
|
|
|
|
const seenNodes = new Set<number>();
|
|
let sentinel: RedisSentinelType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
|
|
const tracer = [];
|
|
|
|
try {
|
|
sentinel = frame.getSentinelClient({ replicaPoolSize: 1, scanInterval: 2000 }, false)
|
|
.on('topology-change', (event: RedisSentinelEvent) => {
|
|
if (event.type == "MASTER_CHANGE" || event.type == "REPLICA_ADD") {
|
|
seenNodes.add(event.node.port);
|
|
if (seenNodes.size == frame.getAllNodesPort().length) {
|
|
nodeResolve();
|
|
}
|
|
}
|
|
}).on('error', err => { });
|
|
sentinel.setTracer(tracer);
|
|
await sentinel.connect();
|
|
|
|
await nodePromise;
|
|
|
|
await sentinel.flushAll();
|
|
} finally {
|
|
if (sentinel !== undefined) {
|
|
sentinel.destroy();
|
|
}
|
|
}
|
|
}
|
|
|
|
["redis-sentinel-test-password", undefined].forEach(function (password) {
|
|
describe(`Sentinel - password = ${password}`, () => {
|
|
const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: password };
|
|
const frame = new SentinelFramework(config);
|
|
let tracer = new Array<string>();
|
|
let stopMeasuringBlocking = false;
|
|
let longestDelta = 0;
|
|
let longestTestDelta = 0;
|
|
let last: number;
|
|
|
|
before(async function () {
|
|
this.timeout(15000);
|
|
|
|
last = Date.now();
|
|
|
|
function deltaMeasurer() {
|
|
const delta = Date.now() - last;
|
|
if (delta > longestDelta) {
|
|
longestDelta = delta;
|
|
}
|
|
if (delta > longestTestDelta) {
|
|
longestTestDelta = delta;
|
|
}
|
|
if (!stopMeasuringBlocking) {
|
|
last = Date.now();
|
|
setImmediate(deltaMeasurer);
|
|
}
|
|
}
|
|
|
|
setImmediate(deltaMeasurer);
|
|
|
|
await frame.spawnRedisSentinel();
|
|
});
|
|
|
|
after(async function () {
|
|
this.timeout(15000);
|
|
|
|
stopMeasuringBlocking = true;
|
|
|
|
await frame.cleanup();
|
|
})
|
|
|
|
describe('Sentinel Client', function () {
|
|
let sentinel: RedisSentinelType< RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
|
|
|
|
beforeEach(async function () {
|
|
this.timeout(0);
|
|
|
|
await frame.getAllRunning();
|
|
|
|
await steadyState(frame);
|
|
longestTestDelta = 0;
|
|
})
|
|
|
|
afterEach(async function () {
|
|
this.timeout(30000);
|
|
|
|
// avoid errors in afterEach that end testing
|
|
if (sentinel !== undefined) {
|
|
sentinel.on('error', () => { });
|
|
}
|
|
|
|
if (this!.currentTest!.state === 'failed') {
|
|
console.log(`longest event loop blocked delta: ${longestDelta}`);
|
|
console.log(`longest event loop blocked in failing test: ${longestTestDelta}`);
|
|
console.log("trace:");
|
|
for (const line of tracer) {
|
|
console.log(line);
|
|
}
|
|
console.log(`sentinel object state:`)
|
|
console.log(`master: ${JSON.stringify(sentinel?.getMasterNode())}`)
|
|
console.log(`replicas: ${JSON.stringify(sentinel?.getReplicaNodes().entries)}`)
|
|
const results = await Promise.all([
|
|
frame.sentinelSentinels(),
|
|
frame.sentinelMaster(),
|
|
frame.sentinelReplicas()
|
|
])
|
|
console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`);
|
|
console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`);
|
|
console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`);
|
|
const { stdout, stderr } = await execAsync("docker ps -a");
|
|
console.log(`docker stdout:\n${stdout}`);
|
|
|
|
const ids = frame.getAllDockerIds();
|
|
console.log("docker logs");
|
|
|
|
for (const [id, port] of ids) {
|
|
console.log(`${id}/${port}\n`);
|
|
const { stdout, stderr } = await execAsync(`docker logs ${id}`, {maxBuffer: 8192 * 8192 * 4});
|
|
console.log(stdout);
|
|
}
|
|
}
|
|
tracer.length = 0;
|
|
|
|
if (sentinel !== undefined) {
|
|
await sentinel.destroy();
|
|
sentinel = undefined;
|
|
}
|
|
})
|
|
|
|
it('basic bootstrap', async function () {
|
|
sentinel = frame.getSentinelClient();
|
|
await sentinel.connect();
|
|
|
|
await assert.doesNotReject(sentinel.set('x', 1));
|
|
|
|
});
|
|
|
|
it('basic teardown worked', async function () {
|
|
const nodePorts = frame.getAllNodesPort();
|
|
const sentinelPorts = frame.getAllSentinelsPort();
|
|
|
|
assert.notEqual(nodePorts.length, 0);
|
|
assert.notEqual(sentinelPorts.length, 0);
|
|
|
|
sentinel = frame.getSentinelClient();
|
|
await sentinel.connect();
|
|
|
|
await assert.doesNotReject(sentinel.get('x'));
|
|
});
|
|
|
|
it('failed to connect', async function() {
|
|
sentinel = frame.getSentinelClient({sentinelRootNodes: [{host: "127.0.0.1", port: 1010}], maxCommandRediscovers: 0})
|
|
await assert.rejects(sentinel.connect());
|
|
});
|
|
|
|
it('try to connect multiple times', async function () {
|
|
sentinel = frame.getSentinelClient();
|
|
const connectPromise = sentinel.connect();
|
|
await assert.rejects(sentinel.connect());
|
|
await connectPromise;
|
|
});
|
|
|
|
it('with type mapping', async function () {
|
|
const commandOptions = {
|
|
typeMapping: {
|
|
[RESP_TYPES.SIMPLE_STRING]: Buffer
|
|
}
|
|
}
|
|
sentinel = frame.getSentinelClient({ commandOptions: commandOptions });
|
|
await sentinel.connect();
|
|
|
|
const resp = await sentinel.ping();
|
|
assert.deepEqual(resp, Buffer.from('PONG'))
|
|
})
|
|
|
|
it('with a script', async function () {
|
|
const options = {
|
|
scripts: {
|
|
square: SQUARE_SCRIPT
|
|
}
|
|
}
|
|
|
|
sentinel = frame.getSentinelClient(options);
|
|
await sentinel.connect();
|
|
|
|
const [, reply] = await Promise.all([
|
|
sentinel.set('key', '2'),
|
|
sentinel.square('key')
|
|
]);
|
|
|
|
assert.equal(reply, 4);
|
|
})
|
|
|
|
it('multi with a script', async function () {
|
|
const options = {
|
|
scripts: {
|
|
square: SQUARE_SCRIPT
|
|
}
|
|
}
|
|
|
|
sentinel = frame.getSentinelClient(options);
|
|
await sentinel.connect();
|
|
|
|
const reply = await sentinel.multi().set('key', 2).square('key').exec();
|
|
|
|
assert.deepEqual(reply, ['OK', 4]);
|
|
})
|
|
|
|
it('with a function', async function () {
|
|
const options = {
|
|
functions: {
|
|
math: MATH_FUNCTION.library
|
|
}
|
|
}
|
|
sentinel = frame.getSentinelClient(options);
|
|
await sentinel.connect();
|
|
|
|
await sentinel.functionLoad(
|
|
MATH_FUNCTION.code,
|
|
{ REPLACE: true }
|
|
);
|
|
|
|
await sentinel.set('key', '2');
|
|
const resp = await sentinel.math.square('key');
|
|
|
|
assert.equal(resp, 4);
|
|
})
|
|
|
|
it('multi with a function', async function () {
|
|
const options = {
|
|
functions: {
|
|
math: MATH_FUNCTION.library
|
|
}
|
|
}
|
|
sentinel = frame.getSentinelClient(options);
|
|
await sentinel.connect();
|
|
|
|
await sentinel.functionLoad(
|
|
MATH_FUNCTION.code,
|
|
{ REPLACE: true }
|
|
);
|
|
|
|
const reply = await sentinel.multi().set('key', 2).math.square('key').exec();
|
|
assert.deepEqual(reply, ['OK', 4]);
|
|
})
|
|
|
|
it('with a module', async function () {
|
|
const options = {
|
|
modules: RedisBloomModules
|
|
}
|
|
sentinel = frame.getSentinelClient(options);
|
|
await sentinel.connect();
|
|
|
|
const resp = await sentinel.bf.add('key', 'item')
|
|
assert.equal(resp, true);
|
|
})
|
|
|
|
it('multi with a module', async function () {
|
|
const options = {
|
|
modules: RedisBloomModules
|
|
}
|
|
sentinel = frame.getSentinelClient(options);
|
|
await sentinel.connect();
|
|
|
|
const resp = await sentinel.multi().bf.add('key', 'item').exec();
|
|
assert.deepEqual(resp, [true]);
|
|
})
|
|
|
|
it('many readers', async function () {
|
|
this.timeout(10000);
|
|
|
|
sentinel = frame.getSentinelClient({ replicaPoolSize: 8 });
|
|
await sentinel.connect();
|
|
|
|
await sentinel.set("x", 1);
|
|
for (let i = 0; i < 10; i++) {
|
|
if (await sentinel.get("x") == "1") {
|
|
break;
|
|
}
|
|
await setTimeout(1000);
|
|
}
|
|
|
|
const promises: Array<Promise<string | null>> = [];
|
|
for (let i = 0; i < 500; i++) {
|
|
promises.push(sentinel.get("x"));
|
|
}
|
|
|
|
const resp = await Promise.all(promises);
|
|
assert.equal(resp.length, 500);
|
|
for (let i = 0; i < 500; i++) {
|
|
assert.equal(resp[i], "1", `failed on match at ${i}`);
|
|
}
|
|
});
|
|
|
|
it('use', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
|
|
sentinel.on("error", (e) => {
|
|
// console.log("some err %s", e)
|
|
});
|
|
await sentinel.connect();
|
|
|
|
await sentinel.use(
|
|
"asd",
|
|
async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>, ) => {
|
|
const masterNode = sentinel!.getMasterNode();
|
|
console.log("stopping")
|
|
await frame.stopNode(masterNode!.port.toString());
|
|
console.log("stopped")
|
|
await assert.doesNotReject(client.get('x'));
|
|
console.log("final await")
|
|
}
|
|
);
|
|
});
|
|
|
|
// TODO: figure out why it fails
|
|
it('use with script', async function () {
|
|
this.timeout(10000);
|
|
|
|
const options = {
|
|
scripts: {
|
|
square: SQUARE_SCRIPT
|
|
}
|
|
}
|
|
|
|
sentinel = frame.getSentinelClient(options);
|
|
await sentinel.connect();
|
|
|
|
const reply = await sentinel.use(
|
|
"dsf",
|
|
async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => {
|
|
console.log("set")
|
|
assert.equal(await client.set('key', '2'), 'OK');
|
|
console.log("get")
|
|
assert.equal(await client.get('key'), '2');
|
|
console.log("square")
|
|
return client.square('key')
|
|
}
|
|
);
|
|
|
|
assert.equal(reply, 4);
|
|
})
|
|
|
|
it('use with a function', async function () {
|
|
this.timeout(10000);
|
|
|
|
const options = {
|
|
functions: {
|
|
math: MATH_FUNCTION.library
|
|
}
|
|
}
|
|
sentinel = frame.getSentinelClient(options);
|
|
await sentinel.connect();
|
|
|
|
await sentinel.functionLoad(
|
|
MATH_FUNCTION.code,
|
|
{ REPLACE: true }
|
|
);
|
|
|
|
const reply = await sentinel.use(
|
|
"asdf",
|
|
async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => {
|
|
await client.set('key', '2');
|
|
return client.math.square('key');
|
|
}
|
|
);
|
|
|
|
assert.equal(reply, 4);
|
|
})
|
|
|
|
it('use with a module', async function () {
|
|
const options = {
|
|
modules: RedisBloomModules
|
|
}
|
|
sentinel = frame.getSentinelClient(options);
|
|
await sentinel.connect();
|
|
|
|
const reply = await sentinel.use(
|
|
"asdf",
|
|
async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => {
|
|
return client.bf.add('key', 'item');
|
|
}
|
|
);
|
|
|
|
assert.equal(reply, true);
|
|
})
|
|
|
|
it('block on pool', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
|
|
sentinel.on("error", () => { });
|
|
await sentinel.connect();
|
|
|
|
const promise = sentinel.use(
|
|
"asdf",
|
|
async client => {
|
|
await setTimeout(1000);
|
|
return await client.get("x");
|
|
}
|
|
)
|
|
|
|
await sentinel.set("x", 1);
|
|
assert.equal(await promise, null);
|
|
});
|
|
|
|
// TODO: figure out why it fails
|
|
it('reserve client, takes a client out of pool', async function () {
|
|
this.timeout(60000);
|
|
|
|
sentinel = frame.getSentinelClient({ masterPoolSize: 2, reserveClient: true });
|
|
await sentinel.connect();
|
|
|
|
const promise1 = sentinel.use(
|
|
"asdf",
|
|
async client => {
|
|
const val = await client.get("x");
|
|
await client.set("x", 2);
|
|
return val;
|
|
}
|
|
)
|
|
|
|
const promise2 = sentinel.use(
|
|
"asdg",
|
|
async client => {
|
|
return client.get("x");
|
|
}
|
|
)
|
|
|
|
await sentinel.set("x", 1);
|
|
assert.equal(await promise1, "1");
|
|
assert.equal(await promise2, "2");
|
|
})
|
|
|
|
it('multiple clients', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
|
|
sentinel.on("error", () => { });
|
|
await sentinel.connect();
|
|
|
|
let set = false;
|
|
|
|
const promise = sentinel.use(
|
|
"asdf",
|
|
async client => {
|
|
await sentinel!.set("x", 1);
|
|
await client.get("x");
|
|
}
|
|
)
|
|
|
|
await assert.doesNotReject(promise);
|
|
});
|
|
|
|
// TODO: figure out why it fails
|
|
// by taking a lease, we know we will block on master as no clients are available, but as read occuring, means replica read occurs
|
|
it('replica reads', async function () {
|
|
this.timeout(60000);
|
|
|
|
sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
|
|
sentinel.on("error", () => { });
|
|
await sentinel.connect();
|
|
|
|
const clientLease = await sentinel.aquire();
|
|
clientLease.set('x', 456);
|
|
|
|
let matched = false;
|
|
/* waits for replication */
|
|
for (let i = 0; i < 15; i++) {
|
|
try {
|
|
assert.equal(await sentinel.get("x"), '456');
|
|
matched = true;
|
|
break;
|
|
} catch (err) {
|
|
await setTimeout(1000);
|
|
}
|
|
}
|
|
|
|
clientLease.release();
|
|
|
|
assert.equal(matched, true);
|
|
});
|
|
|
|
it('pipeline', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
|
|
await sentinel.connect();
|
|
|
|
const resp = await sentinel.multi().set('x', 1).get('x').execAsPipeline();
|
|
|
|
assert.deepEqual(resp, ['OK', '1']);
|
|
})
|
|
|
|
it('use - watch - clean', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
|
|
await sentinel.connect();
|
|
|
|
let promise = sentinel.use(
|
|
"asdf",async (client) => {
|
|
await client.set("x", 1);
|
|
await client.watch("x");
|
|
return client.multi().get("x").exec();
|
|
});
|
|
|
|
assert.deepEqual(await promise, ['1']);
|
|
});
|
|
|
|
it('use - watch - dirty', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
|
|
await sentinel.connect();
|
|
|
|
let promise = sentinel.use(
|
|
"asdg",async (client) => {
|
|
await client.set('x', 1);
|
|
await client.watch('x');
|
|
await sentinel!.set('x', 2);
|
|
return client.multi().get('x').exec();
|
|
});
|
|
|
|
await assert.rejects(promise, new WatchError());
|
|
});
|
|
|
|
it('lease - watch - clean', async function () {
|
|
sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
|
|
await sentinel.connect();
|
|
|
|
const leasedClient = await sentinel.aquire();
|
|
await leasedClient.set('x', 1);
|
|
await leasedClient.watch('x');
|
|
assert.deepEqual(await leasedClient.multi().get('x').exec(), ['1'])
|
|
});
|
|
|
|
it('lease - watch - dirty', async function () {
|
|
sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
|
|
await sentinel.connect();
|
|
|
|
const leasedClient = await sentinel.aquire();
|
|
await leasedClient.set('x', 1);
|
|
await leasedClient.watch('x');
|
|
await leasedClient.set('x', 2);
|
|
|
|
await assert.rejects(leasedClient.multi().get('x').exec(), new WatchError());
|
|
});
|
|
|
|
|
|
it('watch does not carry through leases', async function () {
|
|
this.timeout(10000);
|
|
sentinel = frame.getSentinelClient();
|
|
await sentinel.connect();
|
|
|
|
// each of these commands is an independent lease
|
|
assert.equal(await sentinel.use(
|
|
"asdf",client => client.watch("x")), 'OK')
|
|
assert.equal(await sentinel.use(
|
|
"asdg",client => client.set('x', 1)), 'OK');
|
|
assert.deepEqual(await sentinel.use(
|
|
"asdh",client => client.multi().get('x').exec()), ['1']);
|
|
});
|
|
|
|
// stops master to force sentinel to update
|
|
it('stop master', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient();
|
|
sentinel.setTracer(tracer);
|
|
sentinel.on("error", () => { });
|
|
await sentinel.connect();
|
|
|
|
tracer.push(`connected`);
|
|
|
|
let masterChangeResolve;
|
|
const masterChangePromise = new Promise((res) => {
|
|
masterChangeResolve = res;
|
|
})
|
|
|
|
const masterNode = await sentinel.getMasterNode();
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
|
|
tracer.push(`got expected master change event`);
|
|
masterChangeResolve(event.node);
|
|
}
|
|
});
|
|
|
|
tracer.push(`stopping master node`);
|
|
await frame.stopNode(masterNode!.port.toString());
|
|
tracer.push(`stopped master node`);
|
|
|
|
tracer.push(`waiting on master change promise`);
|
|
const newMaster = await masterChangePromise as RedisNode;
|
|
tracer.push(`got new master node of ${newMaster.port}`);
|
|
assert.notEqual(masterNode!.port, newMaster.port);
|
|
});
|
|
|
|
// if master changes, client should make sure user knows watches are invalid
|
|
it('watch across master change', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
|
|
sentinel.setTracer(tracer);
|
|
sentinel.on("error", () => { });
|
|
await sentinel.connect();
|
|
|
|
tracer.push("connected");
|
|
|
|
const client = await sentinel.aquire();
|
|
tracer.push("aquired lease");
|
|
|
|
await client.set("x", 1);
|
|
await client.watch("x");
|
|
|
|
tracer.push("did a watch on lease");
|
|
|
|
let resolve;
|
|
const promise = new Promise((res) => {
|
|
resolve = res;
|
|
})
|
|
|
|
const masterNode = sentinel.getMasterNode();
|
|
tracer.push(`got masterPort as ${masterNode!.port}`);
|
|
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
|
|
tracer.push("resolving promise");
|
|
resolve(event.node);
|
|
}
|
|
});
|
|
|
|
tracer.push("stopping master node");
|
|
await frame.stopNode(masterNode!.port.toString());
|
|
tracer.push("stopped master node and waiting on promise");
|
|
|
|
const newMaster = await promise as RedisNode;
|
|
tracer.push(`promise returned, newMaster = ${JSON.stringify(newMaster)}`);
|
|
assert.notEqual(masterNode!.port, newMaster.port);
|
|
tracer.push(`newMaster does not equal old master`);
|
|
|
|
tracer.push(`waiting to assert that a multi/exec now fails`);
|
|
await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction"));
|
|
tracer.push(`asserted that a multi/exec now fails`);
|
|
});
|
|
|
|
// same as above, but set a watch before and after master change, shouldn't change the fact that watches are invalid
|
|
it('watch before and after master change', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
|
|
sentinel.setTracer(tracer);
|
|
sentinel.on("error", () => { });
|
|
await sentinel.connect();
|
|
tracer.push("connected");
|
|
|
|
const client = await sentinel.aquire();
|
|
tracer.push("got leased client");
|
|
await client.set("x", 1);
|
|
await client.watch("x");
|
|
|
|
tracer.push("set and watched x");
|
|
|
|
let resolve;
|
|
const promise = new Promise((res) => {
|
|
resolve = res;
|
|
})
|
|
|
|
const masterNode = sentinel.getMasterNode();
|
|
tracer.push(`initial masterPort = ${masterNode!.port} `);
|
|
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
|
|
tracer.push("got a master change event that is not the same as before");
|
|
resolve(event.node);
|
|
}
|
|
});
|
|
|
|
tracer.push("stopping master");
|
|
await frame.stopNode(masterNode!.port.toString());
|
|
tracer.push("stopped master");
|
|
|
|
tracer.push("waiting on master change promise");
|
|
const newMaster = await promise as RedisNode;
|
|
tracer.push(`got master change port as ${newMaster.port}`);
|
|
assert.notEqual(masterNode!.port, newMaster.port);
|
|
|
|
tracer.push("watching again, shouldn't matter");
|
|
await client.watch("y");
|
|
|
|
tracer.push("expecting multi to be rejected");
|
|
await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction"));
|
|
tracer.push("multi was rejected");
|
|
});
|
|
|
|
// TODO: figure out why it fails
|
|
it('plain pubsub - channel', async function () {
|
|
this.timeout(60000);
|
|
|
|
sentinel = frame.getSentinelClient();
|
|
sentinel.setTracer(tracer);
|
|
await sentinel.connect();
|
|
tracer.push(`connected`);
|
|
|
|
let pubSubResolve;
|
|
const pubSubPromise = new Promise((res) => {
|
|
pubSubResolve = res;
|
|
});
|
|
|
|
let tester = false;
|
|
await sentinel.subscribe('test', () => {
|
|
tracer.push(`got pubsub message`);
|
|
tester = true;
|
|
pubSubResolve(1);
|
|
})
|
|
|
|
tracer.push(`publishing pubsub message`);
|
|
await sentinel.publish('test', 'hello world');
|
|
tracer.push(`waiting on pubsub promise`);
|
|
await pubSubPromise;
|
|
tracer.push(`got pubsub promise`);
|
|
assert.equal(tester, true);
|
|
|
|
// now unsubscribe
|
|
tester = false
|
|
tracer.push(`unsubscribing pubsub listener`);
|
|
await sentinel.unsubscribe('test')
|
|
tracer.push(`pubishing pubsub message`);
|
|
await sentinel.publish('test', 'hello world');
|
|
await setTimeout(1000);
|
|
|
|
tracer.push(`ensuring pubsub was unsubscribed via an assert`);
|
|
assert.equal(tester, false);
|
|
});
|
|
|
|
it('plain pubsub - pattern', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient();
|
|
sentinel.setTracer(tracer);
|
|
await sentinel.connect();
|
|
tracer.push(`connected`);
|
|
|
|
let pubSubResolve;
|
|
const pubSubPromise = new Promise((res) => {
|
|
pubSubResolve = res;
|
|
});
|
|
|
|
let tester = false;
|
|
await sentinel.pSubscribe('test*', () => {
|
|
tracer.push(`got pubsub message`);
|
|
tester = true;
|
|
pubSubResolve(1);
|
|
})
|
|
|
|
tracer.push(`publishing pubsub message`);
|
|
await sentinel.publish('testy', 'hello world');
|
|
tracer.push(`waiting on pubsub promise`);
|
|
await pubSubPromise;
|
|
tracer.push(`got pubsub promise`);
|
|
assert.equal(tester, true);
|
|
|
|
// now unsubscribe
|
|
tester = false
|
|
tracer.push(`unsubscribing pubsub listener`);
|
|
await sentinel.pUnsubscribe('test*');
|
|
tracer.push(`pubishing pubsub message`);
|
|
await sentinel.publish('testy', 'hello world');
|
|
await setTimeout(1000);
|
|
|
|
tracer.push(`ensuring pubsub was unsubscribed via an assert`);
|
|
assert.equal(tester, false);
|
|
});
|
|
|
|
// pubsub continues to work, even with a master change
|
|
it('pubsub - channel - with master change', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient();
|
|
sentinel.setTracer(tracer);
|
|
sentinel.on("error", () => { });
|
|
await sentinel.connect();
|
|
tracer.push(`connected`);
|
|
|
|
let pubSubResolve;
|
|
const pubSubPromise = new Promise((res) => {
|
|
pubSubResolve = res;
|
|
})
|
|
|
|
let tester = false;
|
|
await sentinel.subscribe('test', () => {
|
|
tracer.push(`got pubsub message`);
|
|
tester = true;
|
|
pubSubResolve(1);
|
|
})
|
|
|
|
let masterChangeResolve;
|
|
const masterChangePromise = new Promise((res) => {
|
|
masterChangeResolve = res;
|
|
})
|
|
|
|
const masterNode = sentinel.getMasterNode();
|
|
tracer.push(`got masterPort as ${masterNode!.port}`);
|
|
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
|
|
tracer.push("got a master change event that is not the same as before");
|
|
masterChangeResolve(event.node);
|
|
}
|
|
});
|
|
|
|
tracer.push("stopping master");
|
|
await frame.stopNode(masterNode!.port.toString());
|
|
tracer.push("stopped master and waiting on change promise");
|
|
|
|
const newMaster = await masterChangePromise as RedisNode;
|
|
tracer.push(`got master change port as ${newMaster.port}`);
|
|
assert.notEqual(masterNode!.port, newMaster.port);
|
|
|
|
tracer.push(`publishing pubsub message`);
|
|
await sentinel.publish('test', 'hello world');
|
|
tracer.push(`published pubsub message and waiting pn pubsub promise`);
|
|
await pubSubPromise;
|
|
tracer.push(`got pubsub promise`);
|
|
|
|
assert.equal(tester, true);
|
|
|
|
// now unsubscribe
|
|
tester = false
|
|
await sentinel.unsubscribe('test')
|
|
await sentinel.publish('test', 'hello world');
|
|
await setTimeout(1000);
|
|
|
|
assert.equal(tester, false);
|
|
});
|
|
|
|
it('pubsub - pattern - with master change', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient();
|
|
sentinel.setTracer(tracer);
|
|
sentinel.on("error", () => { });
|
|
await sentinel.connect();
|
|
tracer.push(`connected`);
|
|
|
|
let pubSubResolve;
|
|
const pubSubPromise = new Promise((res) => {
|
|
pubSubResolve = res;
|
|
})
|
|
|
|
let tester = false;
|
|
await sentinel.pSubscribe('test*', () => {
|
|
tracer.push(`got pubsub message`);
|
|
tester = true;
|
|
pubSubResolve(1);
|
|
})
|
|
|
|
let masterChangeResolve;
|
|
const masterChangePromise = new Promise((res) => {
|
|
masterChangeResolve = res;
|
|
})
|
|
|
|
const masterNode = sentinel.getMasterNode();
|
|
tracer.push(`got masterPort as ${masterNode!.port}`);
|
|
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
|
|
tracer.push("got a master change event that is not the same as before");
|
|
masterChangeResolve(event.node);
|
|
}
|
|
});
|
|
|
|
tracer.push("stopping master");
|
|
await frame.stopNode(masterNode!.port.toString());
|
|
tracer.push("stopped master and waiting on master change promise");
|
|
|
|
const newMaster = await masterChangePromise as RedisNode;
|
|
tracer.push(`got master change port as ${newMaster.port}`);
|
|
assert.notEqual(masterNode!.port, newMaster.port);
|
|
|
|
tracer.push(`publishing pubsub message`);
|
|
await sentinel.publish('testy', 'hello world');
|
|
tracer.push(`published pubsub message and waiting on pubsub promise`);
|
|
await pubSubPromise;
|
|
tracer.push(`got pubsub promise`);
|
|
assert.equal(tester, true);
|
|
|
|
// now unsubscribe
|
|
tester = false
|
|
await sentinel.pUnsubscribe('test*');
|
|
await sentinel.publish('testy', 'hello world');
|
|
await setTimeout(1000);
|
|
|
|
assert.equal(tester, false);
|
|
});
|
|
|
|
// if we stop a node, the comand should "retry" until we reconfigure topology and execute on new topology
|
|
it('command immeaditely after stopping master', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient();
|
|
sentinel.setTracer(tracer);
|
|
sentinel.on("error", () => { });
|
|
await sentinel.connect();
|
|
|
|
tracer.push("connected");
|
|
|
|
let masterChangeResolve;
|
|
const masterChangePromise = new Promise((res) => {
|
|
masterChangeResolve = res;
|
|
})
|
|
|
|
const masterNode = sentinel.getMasterNode();
|
|
tracer.push(`original master port = ${masterNode!.port}`);
|
|
|
|
let changeCount = 0;
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
|
|
changeCount++;
|
|
tracer.push(`got topology-change event we expected`);
|
|
masterChangeResolve(event.node);
|
|
}
|
|
});
|
|
|
|
tracer.push(`stopping masterNode`);
|
|
await frame.stopNode(masterNode!.port.toString());
|
|
tracer.push(`stopped masterNode`);
|
|
assert.equal(await sentinel.set('x', 123), 'OK');
|
|
tracer.push(`did the set operation`);
|
|
const presumamblyNewMaster = sentinel.getMasterNode();
|
|
tracer.push(`new master node seems to be ${presumamblyNewMaster?.port} and waiting on master change promise`);
|
|
|
|
const newMaster = await masterChangePromise as RedisNode;
|
|
tracer.push(`got new masternode event saying master is at ${newMaster.port}`);
|
|
assert.notEqual(masterNode!.port, newMaster.port);
|
|
|
|
tracer.push(`doing the get`);
|
|
const val = await sentinel.get('x');
|
|
tracer.push(`did the get and got ${val}`);
|
|
const newestMaster = sentinel.getMasterNode()
|
|
tracer.push(`after get, we see master as ${newestMaster?.port}`);
|
|
|
|
switch (changeCount) {
|
|
case 1:
|
|
// if we only changed masters once, we should have the proper value
|
|
assert.equal(val, '123');
|
|
break;
|
|
case 2:
|
|
// we changed masters twice quickly, so probably didn't replicate
|
|
// therefore, this is soewhat flakey, but the above is the common case
|
|
assert(val == '123' || val == null);
|
|
break;
|
|
default:
|
|
assert(false, "unexpected case");
|
|
}
|
|
});
|
|
|
|
it('shutdown sentinel node', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient();
|
|
sentinel.setTracer(tracer);
|
|
sentinel.on("error", () => { });
|
|
await sentinel.connect();
|
|
tracer.push("connected");
|
|
|
|
let sentinelChangeResolve;
|
|
const sentinelChangePromise = new Promise((res) => {
|
|
sentinelChangeResolve = res;
|
|
})
|
|
|
|
const sentinelNode = sentinel.getSentinelNode();
|
|
tracer.push(`sentinelNode = ${sentinelNode?.port}`)
|
|
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "SENTINEL_CHANGE") {
|
|
tracer.push("got sentinel change event");
|
|
sentinelChangeResolve(event.node);
|
|
}
|
|
});
|
|
|
|
tracer.push("Stopping sentinel node");
|
|
await frame.stopSentinel(sentinelNode!.port.toString());
|
|
tracer.push("Stopped sentinel node and waiting on sentinel change promise");
|
|
const newSentinel = await sentinelChangePromise as RedisNode;
|
|
tracer.push("got sentinel change promise");
|
|
assert.notEqual(sentinelNode!.port, newSentinel.port);
|
|
});
|
|
|
|
it('timer works, and updates sentinel list', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ scanInterval: 1000 });
|
|
sentinel.setTracer(tracer);
|
|
await sentinel.connect();
|
|
tracer.push("connected");
|
|
|
|
let sentinelChangeResolve;
|
|
const sentinelChangePromise = new Promise((res) => {
|
|
sentinelChangeResolve = res;
|
|
})
|
|
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "SENTINE_LIST_CHANGE" && event.size == 4) {
|
|
tracer.push(`got sentinel list change event with right size`);
|
|
sentinelChangeResolve(event.size);
|
|
}
|
|
});
|
|
|
|
tracer.push(`adding sentinel`);
|
|
await frame.addSentinel();
|
|
tracer.push(`added sentinel and waiting on sentinel change promise`);
|
|
const newSentinelSize = await sentinelChangePromise as number;
|
|
|
|
assert.equal(newSentinelSize, 4);
|
|
});
|
|
|
|
it('stop replica, bring back replica', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
|
|
sentinel.setTracer(tracer);
|
|
sentinel.on('error', err => { });
|
|
await sentinel.connect();
|
|
tracer.push("connected");
|
|
|
|
let sentinelRemoveResolve;
|
|
const sentinelRemovePromise = new Promise((res) => {
|
|
sentinelRemoveResolve = res;
|
|
})
|
|
|
|
const replicaPort = await frame.getRandonNonMasterNode();
|
|
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "REPLICA_REMOVE") {
|
|
if (event.node.port.toString() == replicaPort) {
|
|
tracer.push("got expected replica removed event");
|
|
sentinelRemoveResolve(event.node);
|
|
} else {
|
|
tracer.push(`got replica removed event for a different node: ${event.node.port}`);
|
|
}
|
|
}
|
|
});
|
|
|
|
tracer.push(`replicaPort = ${replicaPort} and stopping it`);
|
|
await frame.stopNode(replicaPort);
|
|
tracer.push("stopped replica and waiting on sentinel removed promise");
|
|
const stoppedNode = await sentinelRemovePromise as RedisNode;
|
|
tracer.push("got removed promise");
|
|
assert.equal(stoppedNode.port, Number(replicaPort));
|
|
|
|
let sentinelRestartedResolve;
|
|
const sentinelRestartedPromise = new Promise((res) => {
|
|
sentinelRestartedResolve = res;
|
|
})
|
|
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "REPLICA_ADD") {
|
|
tracer.push("got replica added event");
|
|
sentinelRestartedResolve(event.node);
|
|
}
|
|
});
|
|
|
|
tracer.push("restarting replica");
|
|
await frame.restartNode(replicaPort);
|
|
tracer.push("restarted replica and waiting on restart promise");
|
|
const restartedNode = await sentinelRestartedPromise as RedisNode;
|
|
tracer.push("got restarted promise");
|
|
assert.equal(restartedNode.port, Number(replicaPort));
|
|
})
|
|
|
|
it('add a node / new replica', async function () {
|
|
this.timeout(30000);
|
|
|
|
sentinel = frame.getSentinelClient({ scanInterval: 2000, replicaPoolSize: 1 });
|
|
sentinel.setTracer(tracer);
|
|
// need to handle errors, as the spawning a new docker node can cause existing connections to time out
|
|
sentinel.on('error', err => { });
|
|
await sentinel.connect();
|
|
tracer.push("connected");
|
|
|
|
let nodeAddedResolve: (value: RedisNode) => void;
|
|
const nodeAddedPromise = new Promise((res) => {
|
|
nodeAddedResolve = res as (value: RedisNode) => void;
|
|
});
|
|
|
|
const portSet = new Set<number>();
|
|
for (const port of frame.getAllNodesPort()) {
|
|
portSet.add(port);
|
|
}
|
|
|
|
// "on" and not "once" as due to connection timeouts, can happen multiple times, and want right one
|
|
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
|
|
tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
|
|
if (event.type === "REPLICA_ADD") {
|
|
if (!portSet.has(event.node.port)) {
|
|
tracer.push("got expected replica added event");
|
|
nodeAddedResolve(event.node);
|
|
}
|
|
}
|
|
});
|
|
|
|
tracer.push("adding node");
|
|
await frame.addNode();
|
|
tracer.push("added node and waiting on added promise");
|
|
await nodeAddedPromise;
|
|
})
|
|
})
|
|
|
|
describe.skip('Sentinel Factory', function () {
|
|
let master: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
|
|
let replica: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
|
|
|
|
beforeEach(async function () {
|
|
this.timeout(0);
|
|
|
|
await frame.getAllRunning();
|
|
|
|
await steadyState(frame);
|
|
longestTestDelta = 0;
|
|
})
|
|
|
|
afterEach(async function () {
|
|
if (this!.currentTest!.state === 'failed') {
|
|
console.log(`longest event loop blocked delta: ${longestDelta}`);
|
|
console.log(`longest event loop blocked in failing test: ${longestTestDelta}`);
|
|
console.log("trace:");
|
|
for (const line of tracer) {
|
|
console.log(line);
|
|
}
|
|
const results = await Promise.all([
|
|
frame.sentinelSentinels(),
|
|
frame.sentinelMaster(),
|
|
frame.sentinelReplicas()
|
|
])
|
|
console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`);
|
|
console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`);
|
|
console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`);
|
|
const { stdout, stderr } = await execAsync("docker ps -a");
|
|
console.log(`docker stdout:\n${stdout}`);
|
|
console.log(`docker stderr:\n${stderr}`);
|
|
}
|
|
tracer.length = 0;
|
|
|
|
if (master !== undefined) {
|
|
if (master.isOpen) {
|
|
master.destroy();
|
|
}
|
|
master = undefined;
|
|
}
|
|
|
|
if (replica !== undefined) {
|
|
if (replica.isOpen) {
|
|
replica.destroy();
|
|
}
|
|
replica = undefined;
|
|
}
|
|
})
|
|
|
|
it('sentinel factory - master', async function () {
|
|
const sentinelPorts = frame.getAllSentinelsPort();
|
|
const sentinels: Array<RedisNode> = [];
|
|
for (const port of sentinelPorts) {
|
|
sentinels.push({ host: "localhost", port: port });
|
|
}
|
|
|
|
const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} })
|
|
await factory.updateSentinelRootNodes();
|
|
|
|
master = await factory.getMasterClient();
|
|
await master.connect();
|
|
|
|
assert.equal(await master.set("x", 1), 'OK');
|
|
})
|
|
|
|
it('sentinel factory - replica', async function () {
|
|
const sentinelPorts = frame.getAllSentinelsPort();
|
|
const sentinels: Array<RedisNode> = [];
|
|
|
|
for (const port of sentinelPorts) {
|
|
sentinels.push({ host: "localhost", port: port });
|
|
}
|
|
|
|
const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} })
|
|
await factory.updateSentinelRootNodes();
|
|
|
|
const masterNode = await factory.getMasterNode();
|
|
replica = await factory.getReplicaClient();
|
|
const replicaSocketOptions = replica.options?.socket as unknown as RedisTcpSocketOptions | undefined;
|
|
assert.notEqual(masterNode.port, replicaSocketOptions?.port)
|
|
})
|
|
|
|
it('sentinel factory - bad node', async function () {
|
|
const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: [{ host: "locahost", port: 1 }] });
|
|
await assert.rejects(factory.updateSentinelRootNodes(), new Error("Couldn't connect to any sentinel node"));
|
|
})
|
|
|
|
it('sentinel factory - invalid db name', async function () {
|
|
this.timeout(15000);
|
|
|
|
const sentinelPorts = frame.getAllSentinelsPort();
|
|
const sentinels: Array<RedisNode> = [];
|
|
|
|
for (const port of sentinelPorts) {
|
|
sentinels.push({ host: "localhost", port: port });
|
|
}
|
|
|
|
const factory = new RedisSentinelFactory({ name: "invalid-name", sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} })
|
|
await assert.rejects(factory.updateSentinelRootNodes(), new Error("ERR No such master with that name"));
|
|
})
|
|
|
|
it('sentinel factory - no available nodes', async function () {
|
|
this.timeout(15000);
|
|
|
|
const sentinelPorts = frame.getAllSentinelsPort();
|
|
const sentinels: Array<RedisNode> = [];
|
|
|
|
for (const port of sentinelPorts) {
|
|
sentinels.push({ host: "localhost", port: port });
|
|
}
|
|
|
|
const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} })
|
|
|
|
for (const node of frame.getAllNodesPort()) {
|
|
await frame.stopNode(node.toString());
|
|
}
|
|
|
|
await setTimeout(1000);
|
|
|
|
await assert.rejects(factory.getMasterNode(), new Error("Master Node Not Enumerated"));
|
|
})
|
|
})
|
|
})
|
|
});
|