From 32048199eba0c844d4d195991d5a041a24208b0e Mon Sep 17 00:00:00 2001 From: "H. Temelski" Date: Fri, 7 Mar 2025 17:16:39 +0200 Subject: [PATCH] [CAE-342] Some tests --- packages/client/lib/sentinel/index.spec.ts | 1688 ++++++++++---------- packages/client/lib/sentinel/index.ts | 7 +- 2 files changed, 853 insertions(+), 842 deletions(-) diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index b9442fc9c7..42bd261afe 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -177,189 +177,195 @@ async function steadyState(frame: SentinelFramework) { } }) - it('basic bootstrap', async function () { - sentinel = frame.getSentinelClient(); - await sentinel.connect(); + // it('basic bootstrap', async function () { + // sentinel = frame.getSentinelClient(); + // await sentinel.connect(); - await assert.doesNotReject(sentinel.set('x', 1)); + // await assert.doesNotReject(sentinel.set('x', 1)); - }); + // }); - it('basic teardown worked', async function () { - const nodePorts = frame.getAllNodesPort(); - const sentinelPorts = frame.getAllSentinelsPort(); + // it('basic teardown worked', async function () { + // const nodePorts = frame.getAllNodesPort(); + // const sentinelPorts = frame.getAllSentinelsPort(); - assert.notEqual(nodePorts.length, 0); - assert.notEqual(sentinelPorts.length, 0); + // assert.notEqual(nodePorts.length, 0); + // assert.notEqual(sentinelPorts.length, 0); - sentinel = frame.getSentinelClient(); - await sentinel.connect(); + // sentinel = frame.getSentinelClient(); + // await sentinel.connect(); - await assert.doesNotReject(sentinel.get('x')); - }); + // await assert.doesNotReject(sentinel.get('x')); + // }); - it('failed to connect', async function() { - sentinel = frame.getSentinelClient({sentinelRootNodes: [{host: "127.0.0.1", port: 1010}], maxCommandRediscovers: 0}) - await assert.rejects(sentinel.connect()); - }); + // it('failed to connect', async function() { + // sentinel = frame.getSentinelClient({sentinelRootNodes: [{host: "127.0.0.1", port: 1010}], maxCommandRediscovers: 0}) + // await assert.rejects(sentinel.connect()); + // }); - it('try to connect multiple times', async function () { - sentinel = frame.getSentinelClient(); - const connectPromise = sentinel.connect(); - await assert.rejects(sentinel.connect()); - await connectPromise; - }); + // it('try to connect multiple times', async function () { + // sentinel = frame.getSentinelClient(); + // const connectPromise = sentinel.connect(); + // await assert.rejects(sentinel.connect()); + // await connectPromise; + // }); - it('with type mapping', async function () { - const commandOptions = { - typeMapping: { - [RESP_TYPES.SIMPLE_STRING]: Buffer - } - } - sentinel = frame.getSentinelClient({ commandOptions: commandOptions }); - await sentinel.connect(); + // it('with type mapping', async function () { + // const commandOptions = { + // typeMapping: { + // [RESP_TYPES.SIMPLE_STRING]: Buffer + // } + // } + // sentinel = frame.getSentinelClient({ commandOptions: commandOptions }); + // await sentinel.connect(); - const resp = await sentinel.ping(); - assert.deepEqual(resp, Buffer.from('PONG')) - }) + // const resp = await sentinel.ping(); + // assert.deepEqual(resp, Buffer.from('PONG')) + // }) - it('with a script', async function () { - const options = { - scripts: { - square: SQUARE_SCRIPT - } - } + // it('with a script', async function () { + // const options = { + // scripts: { + // square: SQUARE_SCRIPT + // } + // } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); + // sentinel = frame.getSentinelClient(options); + // await sentinel.connect(); - const [, reply] = await Promise.all([ - sentinel.set('key', '2'), - sentinel.square('key') - ]); + // const [, reply] = await Promise.all([ + // sentinel.set('key', '2'), + // sentinel.square('key') + // ]); - assert.equal(reply, 4); - }) + // assert.equal(reply, 4); + // }) - it('multi with a script', async function () { - const options = { - scripts: { - square: SQUARE_SCRIPT - } - } + // it('multi with a script', async function () { + // const options = { + // scripts: { + // square: SQUARE_SCRIPT + // } + // } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); + // sentinel = frame.getSentinelClient(options); + // await sentinel.connect(); - const reply = await sentinel.multi().set('key', 2).square('key').exec(); + // const reply = await sentinel.multi().set('key', 2).square('key').exec(); - assert.deepEqual(reply, ['OK', 4]); - }) + // assert.deepEqual(reply, ['OK', 4]); + // }) - it('with a function', async function () { - const options = { - functions: { - math: MATH_FUNCTION.library - } - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); + // it('with a function', async function () { + // const options = { + // functions: { + // math: MATH_FUNCTION.library + // } + // } + // sentinel = frame.getSentinelClient(options); + // await sentinel.connect(); - await sentinel.functionLoad( - MATH_FUNCTION.code, - { REPLACE: true } - ); + // await sentinel.functionLoad( + // MATH_FUNCTION.code, + // { REPLACE: true } + // ); - await sentinel.set('key', '2'); - const resp = await sentinel.math.square('key'); + // await sentinel.set('key', '2'); + // const resp = await sentinel.math.square('key'); - assert.equal(resp, 4); - }) + // assert.equal(resp, 4); + // }) - it('multi with a function', async function () { - const options = { - functions: { - math: MATH_FUNCTION.library - } - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); + // it('multi with a function', async function () { + // const options = { + // functions: { + // math: MATH_FUNCTION.library + // } + // } + // sentinel = frame.getSentinelClient(options); + // await sentinel.connect(); - await sentinel.functionLoad( - MATH_FUNCTION.code, - { REPLACE: true } - ); + // await sentinel.functionLoad( + // MATH_FUNCTION.code, + // { REPLACE: true } + // ); - const reply = await sentinel.multi().set('key', 2).math.square('key').exec(); - assert.deepEqual(reply, ['OK', 4]); - }) + // const reply = await sentinel.multi().set('key', 2).math.square('key').exec(); + // assert.deepEqual(reply, ['OK', 4]); + // }) - it('with a module', async function () { - const options = { - modules: RedisBloomModules - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); + // it('with a module', async function () { + // const options = { + // modules: RedisBloomModules + // } + // sentinel = frame.getSentinelClient(options); + // await sentinel.connect(); - const resp = await sentinel.bf.add('key', 'item') - assert.equal(resp, true); - }) + // const resp = await sentinel.bf.add('key', 'item') + // assert.equal(resp, true); + // }) - it('multi with a module', async function () { - const options = { - modules: RedisBloomModules - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); + // it('multi with a module', async function () { + // const options = { + // modules: RedisBloomModules + // } + // sentinel = frame.getSentinelClient(options); + // await sentinel.connect(); - const resp = await sentinel.multi().bf.add('key', 'item').exec(); - assert.deepEqual(resp, [true]); - }) + // const resp = await sentinel.multi().bf.add('key', 'item').exec(); + // assert.deepEqual(resp, [true]); + // }) - it('many readers', async function () { - this.timeout(10000); + // it('many readers', async function () { + // this.timeout(10000); - sentinel = frame.getSentinelClient({ replicaPoolSize: 8 }); - await sentinel.connect(); + // sentinel = frame.getSentinelClient({ replicaPoolSize: 8 }); + // await sentinel.connect(); - await sentinel.set("x", 1); - for (let i = 0; i < 10; i++) { - if (await sentinel.get("x") == "1") { - break; - } - await setTimeout(1000); - } + // await sentinel.set("x", 1); + // for (let i = 0; i < 10; i++) { + // if (await sentinel.get("x") == "1") { + // break; + // } + // await setTimeout(1000); + // } - const promises: Array> = []; - for (let i = 0; i < 500; i++) { - promises.push(sentinel.get("x")); - } + // const promises: Array> = []; + // for (let i = 0; i < 500; i++) { + // promises.push(sentinel.get("x")); + // } - const resp = await Promise.all(promises); - assert.equal(resp.length, 500); - for (let i = 0; i < 500; i++) { - assert.equal(resp[i], "1", `failed on match at ${i}`); - } - }); + // const resp = await Promise.all(promises); + // assert.equal(resp.length, 500); + // for (let i = 0; i < 500; i++) { + // assert.equal(resp[i], "1", `failed on match at ${i}`); + // } + // }); - it('use', async function () { + it.skip('use', async function () { this.timeout(30000); sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.on("error", () => { }); + sentinel.on("error", (e) => { + console.log("some err %s", e) + }); await sentinel.connect(); await sentinel.use( + "asd", async (client: RedisSentinelClientType, ) => { const masterNode = sentinel!.getMasterNode(); + console.log("stopping") await frame.stopNode(masterNode!.port.toString()); + console.log("stopped") await assert.doesNotReject(client.get('x')); + console.log("final await") } ); }); // TODO: figure out why it fails - it.skip('use with script', async function () { + it('use with script', async function () { this.timeout(10000); const options = { @@ -372,9 +378,13 @@ async function steadyState(frame: SentinelFramework) { await sentinel.connect(); const reply = await sentinel.use( + "dsf", async (client: RedisSentinelClientType) => { + console.log("set") assert.equal(await client.set('key', '2'), 'OK'); + console.log("get") assert.equal(await client.get('key'), '2'); + console.log("square") return client.square('key') } ); @@ -382,772 +392,772 @@ async function steadyState(frame: SentinelFramework) { assert.equal(reply, 4); }) - it('use with a function', async function () { - this.timeout(10000); + // it('use with a function', async function () { + // this.timeout(10000); - const options = { - functions: { - math: MATH_FUNCTION.library - } - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); + // const options = { + // functions: { + // math: MATH_FUNCTION.library + // } + // } + // sentinel = frame.getSentinelClient(options); + // await sentinel.connect(); - await sentinel.functionLoad( - MATH_FUNCTION.code, - { REPLACE: true } - ); + // await sentinel.functionLoad( + // MATH_FUNCTION.code, + // { REPLACE: true } + // ); - const reply = await sentinel.use( - async (client: RedisSentinelClientType) => { - await client.set('key', '2'); - return client.math.square('key'); - } - ); + // const reply = await sentinel.use( + // async (client: RedisSentinelClientType) => { + // await client.set('key', '2'); + // return client.math.square('key'); + // } + // ); - assert.equal(reply, 4); - }) + // assert.equal(reply, 4); + // }) - it('use with a module', async function () { - const options = { - modules: RedisBloomModules - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); + // it('use with a module', async function () { + // const options = { + // modules: RedisBloomModules + // } + // sentinel = frame.getSentinelClient(options); + // await sentinel.connect(); - const reply = await sentinel.use( - async (client: RedisSentinelClientType) => { - return client.bf.add('key', 'item'); - } - ); + // const reply = await sentinel.use( + // async (client: RedisSentinelClientType) => { + // return client.bf.add('key', 'item'); + // } + // ); - assert.equal(reply, true); - }) + // assert.equal(reply, true); + // }) - it('block on pool', async function () { - this.timeout(30000); + // it('block on pool', async function () { + // this.timeout(30000); - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.on("error", () => { }); - await sentinel.connect(); + // sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); + // sentinel.on("error", () => { }); + // await sentinel.connect(); - const promise = sentinel.use( - async client => { - await setTimeout(1000); - return await client.get("x"); - } - ) + // const promise = sentinel.use( + // async client => { + // await setTimeout(1000); + // return await client.get("x"); + // } + // ) - await sentinel.set("x", 1); - assert.equal(await promise, null); - }); + // await sentinel.set("x", 1); + // assert.equal(await promise, null); + // }); - // TODO: figure out why it fails - it.skip('reserve client, takes a client out of pool', async function () { - this.timeout(30000); + // // TODO: figure out why it fails + // it.skip('reserve client, takes a client out of pool', async function () { + // this.timeout(30000); - sentinel = frame.getSentinelClient({ masterPoolSize: 2, reserveClient: true }); - await sentinel.connect(); + // sentinel = frame.getSentinelClient({ masterPoolSize: 2, reserveClient: true }); + // await sentinel.connect(); - const promise1 = sentinel.use( - async client => { - const val = await client.get("x"); - await client.set("x", 2); - return val; - } - ) + // const promise1 = sentinel.use( + // async client => { + // const val = await client.get("x"); + // await client.set("x", 2); + // return val; + // } + // ) - const promise2 = sentinel.use( - async client => { - return client.get("x"); - } - ) + // const promise2 = sentinel.use( + // async client => { + // return client.get("x"); + // } + // ) - await sentinel.set("x", 1); - assert.equal(await promise1, "1"); - assert.equal(await promise2, "2"); - }) + // await sentinel.set("x", 1); + // assert.equal(await promise1, "1"); + // assert.equal(await promise2, "2"); + // }) - it('multiple clients', async function () { - this.timeout(30000); + // it('multiple clients', async function () { + // this.timeout(30000); - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - sentinel.on("error", () => { }); - await sentinel.connect(); + // sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + // sentinel.on("error", () => { }); + // await sentinel.connect(); - let set = false; + // let set = false; - const promise = sentinel.use( - async client => { - await sentinel!.set("x", 1); - await client.get("x"); - } - ) + // const promise = sentinel.use( + // async client => { + // await sentinel!.set("x", 1); + // await client.get("x"); + // } + // ) - await assert.doesNotReject(promise); - }); + // await assert.doesNotReject(promise); + // }); - // TODO: figure out why it fails - // by taking a lease, we know we will block on master as no clients are available, but as read occuring, means replica read occurs - it.skip('replica reads', async function () { - this.timeout(30000); + // // TODO: figure out why it fails + // // by taking a lease, we know we will block on master as no clients are available, but as read occuring, means replica read occurs + // it.skip('replica reads', async function () { + // this.timeout(30000); - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.on("error", () => { }); - await sentinel.connect(); + // sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); + // sentinel.on("error", () => { }); + // await sentinel.connect(); - const clientLease = await sentinel.aquire(); - clientLease.set('x', 456); + // const clientLease = await sentinel.aquire(); + // clientLease.set('x', 456); - let matched = false; - /* waits for replication */ - for (let i = 0; i < 15; i++) { - try { - assert.equal(await sentinel.get("x"), '456'); - matched = true; - break; - } catch (err) { - await setTimeout(1000); - } - } + // let matched = false; + // /* waits for replication */ + // for (let i = 0; i < 15; i++) { + // try { + // assert.equal(await sentinel.get("x"), '456'); + // matched = true; + // break; + // } catch (err) { + // await setTimeout(1000); + // } + // } - clientLease.release(); + // clientLease.release(); - assert.equal(matched, true); - }); + // assert.equal(matched, true); + // }); - it('pipeline', async function () { - this.timeout(30000); + // it('pipeline', async function () { + // this.timeout(30000); - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - await sentinel.connect(); + // sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); + // await sentinel.connect(); - const resp = await sentinel.multi().set('x', 1).get('x').execAsPipeline(); + // const resp = await sentinel.multi().set('x', 1).get('x').execAsPipeline(); - assert.deepEqual(resp, ['OK', '1']); - }) + // assert.deepEqual(resp, ['OK', '1']); + // }) - it('use - watch - clean', async function () { - this.timeout(30000); + // it('use - watch - clean', async function () { + // this.timeout(30000); - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); + // sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + // await sentinel.connect(); - let promise = sentinel.use(async (client) => { - await client.set("x", 1); - await client.watch("x"); - return client.multi().get("x").exec(); - }); + // let promise = sentinel.use(async (client) => { + // await client.set("x", 1); + // await client.watch("x"); + // return client.multi().get("x").exec(); + // }); - assert.deepEqual(await promise, ['1']); - }); + // assert.deepEqual(await promise, ['1']); + // }); - it('use - watch - dirty', async function () { - this.timeout(30000); + // it('use - watch - dirty', async function () { + // this.timeout(30000); - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); + // sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + // await sentinel.connect(); - let promise = sentinel.use(async (client) => { - await client.set('x', 1); - await client.watch('x'); - await sentinel!.set('x', 2); - return client.multi().get('x').exec(); - }); + // let promise = sentinel.use(async (client) => { + // await client.set('x', 1); + // await client.watch('x'); + // await sentinel!.set('x', 2); + // return client.multi().get('x').exec(); + // }); - await assert.rejects(promise, new WatchError()); - }); + // await assert.rejects(promise, new WatchError()); + // }); - it('lease - watch - clean', async function () { - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); + // it('lease - watch - clean', async function () { + // sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + // await sentinel.connect(); - const leasedClient = await sentinel.aquire(); - await leasedClient.set('x', 1); - await leasedClient.watch('x'); - assert.deepEqual(await leasedClient.multi().get('x').exec(), ['1']) - }); + // const leasedClient = await sentinel.aquire(); + // await leasedClient.set('x', 1); + // await leasedClient.watch('x'); + // assert.deepEqual(await leasedClient.multi().get('x').exec(), ['1']) + // }); - it('lease - watch - dirty', async function () { - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); + // it('lease - watch - dirty', async function () { + // sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + // await sentinel.connect(); - const leasedClient = await sentinel.aquire(); - await leasedClient.set('x', 1); - await leasedClient.watch('x'); - await leasedClient.set('x', 2); + // const leasedClient = await sentinel.aquire(); + // await leasedClient.set('x', 1); + // await leasedClient.watch('x'); + // await leasedClient.set('x', 2); - await assert.rejects(leasedClient.multi().get('x').exec(), new WatchError()); - }); + // await assert.rejects(leasedClient.multi().get('x').exec(), new WatchError()); + // }); - it('watch does not carry through leases', async function () { - this.timeout(10000); - sentinel = frame.getSentinelClient(); - await sentinel.connect(); + // it('watch does not carry through leases', async function () { + // this.timeout(10000); + // sentinel = frame.getSentinelClient(); + // await sentinel.connect(); - // each of these commands is an independent lease - assert.equal(await sentinel.use(client => client.watch("x")), 'OK') - assert.equal(await sentinel.use(client => client.set('x', 1)), 'OK'); - assert.deepEqual(await sentinel.use(client => client.multi().get('x').exec()), ['1']); - }); - - // stops master to force sentinel to update - it('stop master', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - - tracer.push(`connected`); - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = await sentinel.getMasterNode(); - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push(`got expected master change event`); - masterChangeResolve(event.node); - } - }); - - tracer.push(`stopping master node`); - await frame.stopNode(masterNode!.port.toString()); - tracer.push(`stopped master node`); - - tracer.push(`waiting on master change promise`); - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got new master node of ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - }); - - // if master changes, client should make sure user knows watches are invalid - it('watch across master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - - tracer.push("connected"); - - const client = await sentinel.aquire(); - tracer.push("aquired lease"); - - await client.set("x", 1); - await client.watch("x"); - - tracer.push("did a watch on lease"); - - let resolve; - const promise = new Promise((res) => { - resolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`got masterPort as ${masterNode!.port}`); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("resolving promise"); - resolve(event.node); - } - }); - - tracer.push("stopping master node"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master node and waiting on promise"); - - const newMaster = await promise as RedisNode; - tracer.push(`promise returned, newMaster = ${JSON.stringify(newMaster)}`); - assert.notEqual(masterNode!.port, newMaster.port); - tracer.push(`newMaster does not equal old master`); - - tracer.push(`waiting to assert that a multi/exec now fails`); - await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); - tracer.push(`asserted that a multi/exec now fails`); - }); - - // same as above, but set a watch before and after master change, shouldn't change the fact that watches are invalid - it('watch before and after master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push("connected"); - - const client = await sentinel.aquire(); - tracer.push("got leased client"); - await client.set("x", 1); - await client.watch("x"); - - tracer.push("set and watched x"); - - let resolve; - const promise = new Promise((res) => { - resolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`initial masterPort = ${masterNode!.port} `); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("got a master change event that is not the same as before"); - resolve(event.node); - } - }); - - tracer.push("stopping master"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master"); - - tracer.push("waiting on master change promise"); - const newMaster = await promise as RedisNode; - tracer.push(`got master change port as ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push("watching again, shouldn't matter"); - await client.watch("y"); - - tracer.push("expecting multi to be rejected"); - await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); - tracer.push("multi was rejected"); - }); - - // TODO: figure out why it fails - it.skip('plain pubsub - channel', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }); - - let tester = false; - await sentinel.subscribe('test', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - tracer.push(`publishing pubsub message`); - await sentinel.publish('test', 'hello world'); - tracer.push(`waiting on pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - assert.equal(tester, true); - - // now unsubscribe - tester = false - tracer.push(`unsubscribing pubsub listener`); - await sentinel.unsubscribe('test') - tracer.push(`pubishing pubsub message`); - await sentinel.publish('test', 'hello world'); - await setTimeout(1000); - - tracer.push(`ensuring pubsub was unsubscribed via an assert`); - assert.equal(tester, false); - }); - - it('plain pubsub - pattern', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }); - - let tester = false; - await sentinel.pSubscribe('test*', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - tracer.push(`publishing pubsub message`); - await sentinel.publish('testy', 'hello world'); - tracer.push(`waiting on pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - assert.equal(tester, true); - - // now unsubscribe - tester = false - tracer.push(`unsubscribing pubsub listener`); - await sentinel.pUnsubscribe('test*'); - tracer.push(`pubishing pubsub message`); - await sentinel.publish('testy', 'hello world'); - await setTimeout(1000); - - tracer.push(`ensuring pubsub was unsubscribed via an assert`); - assert.equal(tester, false); - }); - - // pubsub continues to work, even with a master change - it('pubsub - channel - with master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }) - - let tester = false; - await sentinel.subscribe('test', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`got masterPort as ${masterNode!.port}`); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("got a master change event that is not the same as before"); - masterChangeResolve(event.node); - } - }); - - tracer.push("stopping master"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master and waiting on change promise"); - - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got master change port as ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push(`publishing pubsub message`); - await sentinel.publish('test', 'hello world'); - tracer.push(`published pubsub message and waiting pn pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - - assert.equal(tester, true); - - // now unsubscribe - tester = false - await sentinel.unsubscribe('test') - await sentinel.publish('test', 'hello world'); - await setTimeout(1000); - - assert.equal(tester, false); - }); - - it('pubsub - pattern - with master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }) - - let tester = false; - await sentinel.pSubscribe('test*', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`got masterPort as ${masterNode!.port}`); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("got a master change event that is not the same as before"); - masterChangeResolve(event.node); - } - }); - - tracer.push("stopping master"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master and waiting on master change promise"); - - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got master change port as ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push(`publishing pubsub message`); - await sentinel.publish('testy', 'hello world'); - tracer.push(`published pubsub message and waiting on pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - assert.equal(tester, true); - - // now unsubscribe - tester = false - await sentinel.pUnsubscribe('test*'); - await sentinel.publish('testy', 'hello world'); - await setTimeout(1000); - - assert.equal(tester, false); - }); - - // if we stop a node, the comand should "retry" until we reconfigure topology and execute on new topology - it('command immeaditely after stopping master', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - - tracer.push("connected"); - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`original master port = ${masterNode!.port}`); - - let changeCount = 0; - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - changeCount++; - tracer.push(`got topology-change event we expected`); - masterChangeResolve(event.node); - } - }); - - tracer.push(`stopping masterNode`); - await frame.stopNode(masterNode!.port.toString()); - tracer.push(`stopped masterNode`); - assert.equal(await sentinel.set('x', 123), 'OK'); - tracer.push(`did the set operation`); - const presumamblyNewMaster = sentinel.getMasterNode(); - tracer.push(`new master node seems to be ${presumamblyNewMaster?.port} and waiting on master change promise`); - - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got new masternode event saying master is at ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push(`doing the get`); - const val = await sentinel.get('x'); - tracer.push(`did the get and got ${val}`); - const newestMaster = sentinel.getMasterNode() - tracer.push(`after get, we see master as ${newestMaster?.port}`); - - switch (changeCount) { - case 1: - // if we only changed masters once, we should have the proper value - assert.equal(val, '123'); - break; - case 2: - // we changed masters twice quickly, so probably didn't replicate - // therefore, this is soewhat flakey, but the above is the common case - assert(val == '123' || val == null); - break; - default: - assert(false, "unexpected case"); - } - }); - - it('shutdown sentinel node', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push("connected"); - - let sentinelChangeResolve; - const sentinelChangePromise = new Promise((res) => { - sentinelChangeResolve = res; - }) - - const sentinelNode = sentinel.getSentinelNode(); - tracer.push(`sentinelNode = ${sentinelNode?.port}`) - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "SENTINEL_CHANGE") { - tracer.push("got sentinel change event"); - sentinelChangeResolve(event.node); - } - }); - - tracer.push("Stopping sentinel node"); - await frame.stopSentinel(sentinelNode!.port.toString()); - tracer.push("Stopped sentinel node and waiting on sentinel change promise"); - const newSentinel = await sentinelChangePromise as RedisNode; - tracer.push("got sentinel change promise"); - assert.notEqual(sentinelNode!.port, newSentinel.port); - }); - - it('timer works, and updates sentinel list', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ scanInterval: 1000 }); - sentinel.setTracer(tracer); - await sentinel.connect(); - tracer.push("connected"); - - let sentinelChangeResolve; - const sentinelChangePromise = new Promise((res) => { - sentinelChangeResolve = res; - }) - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "SENTINE_LIST_CHANGE" && event.size == 4) { - tracer.push(`got sentinel list change event with right size`); - sentinelChangeResolve(event.size); - } - }); - - tracer.push(`adding sentinel`); - await frame.addSentinel(); - tracer.push(`added sentinel and waiting on sentinel change promise`); - const newSentinelSize = await sentinelChangePromise as number; - - assert.equal(newSentinelSize, 4); - }); - - it('stop replica, bring back replica', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.setTracer(tracer); - sentinel.on('error', err => { }); - await sentinel.connect(); - tracer.push("connected"); - - let sentinelRemoveResolve; - const sentinelRemovePromise = new Promise((res) => { - sentinelRemoveResolve = res; - }) - - const replicaPort = await frame.getRandonNonMasterNode(); + // // each of these commands is an independent lease + // assert.equal(await sentinel.use(client => client.watch("x")), 'OK') + // assert.equal(await sentinel.use(client => client.set('x', 1)), 'OK'); + // assert.deepEqual(await sentinel.use(client => client.multi().get('x').exec()), ['1']); + // }); + + // // stops master to force sentinel to update + // it('stop master', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient(); + // sentinel.setTracer(tracer); + // sentinel.on("error", () => { }); + // await sentinel.connect(); + + // tracer.push(`connected`); + + // let masterChangeResolve; + // const masterChangePromise = new Promise((res) => { + // masterChangeResolve = res; + // }) + + // const masterNode = await sentinel.getMasterNode(); + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + // tracer.push(`got expected master change event`); + // masterChangeResolve(event.node); + // } + // }); + + // tracer.push(`stopping master node`); + // await frame.stopNode(masterNode!.port.toString()); + // tracer.push(`stopped master node`); + + // tracer.push(`waiting on master change promise`); + // const newMaster = await masterChangePromise as RedisNode; + // tracer.push(`got new master node of ${newMaster.port}`); + // assert.notEqual(masterNode!.port, newMaster.port); + // }); + + // // if master changes, client should make sure user knows watches are invalid + // it('watch across master change', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + // sentinel.setTracer(tracer); + // sentinel.on("error", () => { }); + // await sentinel.connect(); + + // tracer.push("connected"); + + // const client = await sentinel.aquire(); + // tracer.push("aquired lease"); + + // await client.set("x", 1); + // await client.watch("x"); + + // tracer.push("did a watch on lease"); + + // let resolve; + // const promise = new Promise((res) => { + // resolve = res; + // }) + + // const masterNode = sentinel.getMasterNode(); + // tracer.push(`got masterPort as ${masterNode!.port}`); + + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + // tracer.push("resolving promise"); + // resolve(event.node); + // } + // }); + + // tracer.push("stopping master node"); + // await frame.stopNode(masterNode!.port.toString()); + // tracer.push("stopped master node and waiting on promise"); + + // const newMaster = await promise as RedisNode; + // tracer.push(`promise returned, newMaster = ${JSON.stringify(newMaster)}`); + // assert.notEqual(masterNode!.port, newMaster.port); + // tracer.push(`newMaster does not equal old master`); + + // tracer.push(`waiting to assert that a multi/exec now fails`); + // await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); + // tracer.push(`asserted that a multi/exec now fails`); + // }); + + // // same as above, but set a watch before and after master change, shouldn't change the fact that watches are invalid + // it('watch before and after master change', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + // sentinel.setTracer(tracer); + // sentinel.on("error", () => { }); + // await sentinel.connect(); + // tracer.push("connected"); + + // const client = await sentinel.aquire(); + // tracer.push("got leased client"); + // await client.set("x", 1); + // await client.watch("x"); + + // tracer.push("set and watched x"); + + // let resolve; + // const promise = new Promise((res) => { + // resolve = res; + // }) + + // const masterNode = sentinel.getMasterNode(); + // tracer.push(`initial masterPort = ${masterNode!.port} `); + + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + // tracer.push("got a master change event that is not the same as before"); + // resolve(event.node); + // } + // }); + + // tracer.push("stopping master"); + // await frame.stopNode(masterNode!.port.toString()); + // tracer.push("stopped master"); + + // tracer.push("waiting on master change promise"); + // const newMaster = await promise as RedisNode; + // tracer.push(`got master change port as ${newMaster.port}`); + // assert.notEqual(masterNode!.port, newMaster.port); + + // tracer.push("watching again, shouldn't matter"); + // await client.watch("y"); + + // tracer.push("expecting multi to be rejected"); + // await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); + // tracer.push("multi was rejected"); + // }); + + // // TODO: figure out why it fails + // it.skip('plain pubsub - channel', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient(); + // sentinel.setTracer(tracer); + // await sentinel.connect(); + // tracer.push(`connected`); + + // let pubSubResolve; + // const pubSubPromise = new Promise((res) => { + // pubSubResolve = res; + // }); + + // let tester = false; + // await sentinel.subscribe('test', () => { + // tracer.push(`got pubsub message`); + // tester = true; + // pubSubResolve(1); + // }) + + // tracer.push(`publishing pubsub message`); + // await sentinel.publish('test', 'hello world'); + // tracer.push(`waiting on pubsub promise`); + // await pubSubPromise; + // tracer.push(`got pubsub promise`); + // assert.equal(tester, true); + + // // now unsubscribe + // tester = false + // tracer.push(`unsubscribing pubsub listener`); + // await sentinel.unsubscribe('test') + // tracer.push(`pubishing pubsub message`); + // await sentinel.publish('test', 'hello world'); + // await setTimeout(1000); + + // tracer.push(`ensuring pubsub was unsubscribed via an assert`); + // assert.equal(tester, false); + // }); + + // it('plain pubsub - pattern', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient(); + // sentinel.setTracer(tracer); + // await sentinel.connect(); + // tracer.push(`connected`); + + // let pubSubResolve; + // const pubSubPromise = new Promise((res) => { + // pubSubResolve = res; + // }); + + // let tester = false; + // await sentinel.pSubscribe('test*', () => { + // tracer.push(`got pubsub message`); + // tester = true; + // pubSubResolve(1); + // }) + + // tracer.push(`publishing pubsub message`); + // await sentinel.publish('testy', 'hello world'); + // tracer.push(`waiting on pubsub promise`); + // await pubSubPromise; + // tracer.push(`got pubsub promise`); + // assert.equal(tester, true); + + // // now unsubscribe + // tester = false + // tracer.push(`unsubscribing pubsub listener`); + // await sentinel.pUnsubscribe('test*'); + // tracer.push(`pubishing pubsub message`); + // await sentinel.publish('testy', 'hello world'); + // await setTimeout(1000); + + // tracer.push(`ensuring pubsub was unsubscribed via an assert`); + // assert.equal(tester, false); + // }); + + // // pubsub continues to work, even with a master change + // it('pubsub - channel - with master change', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient(); + // sentinel.setTracer(tracer); + // sentinel.on("error", () => { }); + // await sentinel.connect(); + // tracer.push(`connected`); + + // let pubSubResolve; + // const pubSubPromise = new Promise((res) => { + // pubSubResolve = res; + // }) + + // let tester = false; + // await sentinel.subscribe('test', () => { + // tracer.push(`got pubsub message`); + // tester = true; + // pubSubResolve(1); + // }) + + // let masterChangeResolve; + // const masterChangePromise = new Promise((res) => { + // masterChangeResolve = res; + // }) + + // const masterNode = sentinel.getMasterNode(); + // tracer.push(`got masterPort as ${masterNode!.port}`); + + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + // tracer.push("got a master change event that is not the same as before"); + // masterChangeResolve(event.node); + // } + // }); + + // tracer.push("stopping master"); + // await frame.stopNode(masterNode!.port.toString()); + // tracer.push("stopped master and waiting on change promise"); + + // const newMaster = await masterChangePromise as RedisNode; + // tracer.push(`got master change port as ${newMaster.port}`); + // assert.notEqual(masterNode!.port, newMaster.port); + + // tracer.push(`publishing pubsub message`); + // await sentinel.publish('test', 'hello world'); + // tracer.push(`published pubsub message and waiting pn pubsub promise`); + // await pubSubPromise; + // tracer.push(`got pubsub promise`); + + // assert.equal(tester, true); + + // // now unsubscribe + // tester = false + // await sentinel.unsubscribe('test') + // await sentinel.publish('test', 'hello world'); + // await setTimeout(1000); + + // assert.equal(tester, false); + // }); + + // it('pubsub - pattern - with master change', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient(); + // sentinel.setTracer(tracer); + // sentinel.on("error", () => { }); + // await sentinel.connect(); + // tracer.push(`connected`); + + // let pubSubResolve; + // const pubSubPromise = new Promise((res) => { + // pubSubResolve = res; + // }) + + // let tester = false; + // await sentinel.pSubscribe('test*', () => { + // tracer.push(`got pubsub message`); + // tester = true; + // pubSubResolve(1); + // }) + + // let masterChangeResolve; + // const masterChangePromise = new Promise((res) => { + // masterChangeResolve = res; + // }) + + // const masterNode = sentinel.getMasterNode(); + // tracer.push(`got masterPort as ${masterNode!.port}`); + + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + // tracer.push("got a master change event that is not the same as before"); + // masterChangeResolve(event.node); + // } + // }); + + // tracer.push("stopping master"); + // await frame.stopNode(masterNode!.port.toString()); + // tracer.push("stopped master and waiting on master change promise"); + + // const newMaster = await masterChangePromise as RedisNode; + // tracer.push(`got master change port as ${newMaster.port}`); + // assert.notEqual(masterNode!.port, newMaster.port); + + // tracer.push(`publishing pubsub message`); + // await sentinel.publish('testy', 'hello world'); + // tracer.push(`published pubsub message and waiting on pubsub promise`); + // await pubSubPromise; + // tracer.push(`got pubsub promise`); + // assert.equal(tester, true); + + // // now unsubscribe + // tester = false + // await sentinel.pUnsubscribe('test*'); + // await sentinel.publish('testy', 'hello world'); + // await setTimeout(1000); + + // assert.equal(tester, false); + // }); + + // // if we stop a node, the comand should "retry" until we reconfigure topology and execute on new topology + // it('command immeaditely after stopping master', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient(); + // sentinel.setTracer(tracer); + // sentinel.on("error", () => { }); + // await sentinel.connect(); + + // tracer.push("connected"); + + // let masterChangeResolve; + // const masterChangePromise = new Promise((res) => { + // masterChangeResolve = res; + // }) + + // const masterNode = sentinel.getMasterNode(); + // tracer.push(`original master port = ${masterNode!.port}`); + + // let changeCount = 0; + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + // changeCount++; + // tracer.push(`got topology-change event we expected`); + // masterChangeResolve(event.node); + // } + // }); + + // tracer.push(`stopping masterNode`); + // await frame.stopNode(masterNode!.port.toString()); + // tracer.push(`stopped masterNode`); + // assert.equal(await sentinel.set('x', 123), 'OK'); + // tracer.push(`did the set operation`); + // const presumamblyNewMaster = sentinel.getMasterNode(); + // tracer.push(`new master node seems to be ${presumamblyNewMaster?.port} and waiting on master change promise`); + + // const newMaster = await masterChangePromise as RedisNode; + // tracer.push(`got new masternode event saying master is at ${newMaster.port}`); + // assert.notEqual(masterNode!.port, newMaster.port); + + // tracer.push(`doing the get`); + // const val = await sentinel.get('x'); + // tracer.push(`did the get and got ${val}`); + // const newestMaster = sentinel.getMasterNode() + // tracer.push(`after get, we see master as ${newestMaster?.port}`); + + // switch (changeCount) { + // case 1: + // // if we only changed masters once, we should have the proper value + // assert.equal(val, '123'); + // break; + // case 2: + // // we changed masters twice quickly, so probably didn't replicate + // // therefore, this is soewhat flakey, but the above is the common case + // assert(val == '123' || val == null); + // break; + // default: + // assert(false, "unexpected case"); + // } + // }); + + // it('shutdown sentinel node', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient(); + // sentinel.setTracer(tracer); + // sentinel.on("error", () => { }); + // await sentinel.connect(); + // tracer.push("connected"); + + // let sentinelChangeResolve; + // const sentinelChangePromise = new Promise((res) => { + // sentinelChangeResolve = res; + // }) + + // const sentinelNode = sentinel.getSentinelNode(); + // tracer.push(`sentinelNode = ${sentinelNode?.port}`) + + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "SENTINEL_CHANGE") { + // tracer.push("got sentinel change event"); + // sentinelChangeResolve(event.node); + // } + // }); + + // tracer.push("Stopping sentinel node"); + // await frame.stopSentinel(sentinelNode!.port.toString()); + // tracer.push("Stopped sentinel node and waiting on sentinel change promise"); + // const newSentinel = await sentinelChangePromise as RedisNode; + // tracer.push("got sentinel change promise"); + // assert.notEqual(sentinelNode!.port, newSentinel.port); + // }); + + // it('timer works, and updates sentinel list', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient({ scanInterval: 1000 }); + // sentinel.setTracer(tracer); + // await sentinel.connect(); + // tracer.push("connected"); + + // let sentinelChangeResolve; + // const sentinelChangePromise = new Promise((res) => { + // sentinelChangeResolve = res; + // }) + + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "SENTINE_LIST_CHANGE" && event.size == 4) { + // tracer.push(`got sentinel list change event with right size`); + // sentinelChangeResolve(event.size); + // } + // }); + + // tracer.push(`adding sentinel`); + // await frame.addSentinel(); + // tracer.push(`added sentinel and waiting on sentinel change promise`); + // const newSentinelSize = await sentinelChangePromise as number; + + // assert.equal(newSentinelSize, 4); + // }); + + // it('stop replica, bring back replica', async function () { + // this.timeout(30000); + + // sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); + // sentinel.setTracer(tracer); + // sentinel.on('error', err => { }); + // await sentinel.connect(); + // tracer.push("connected"); + + // let sentinelRemoveResolve; + // const sentinelRemovePromise = new Promise((res) => { + // sentinelRemoveResolve = res; + // }) + + // const replicaPort = await frame.getRandonNonMasterNode(); - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "REPLICA_REMOVE") { - if (event.node.port.toString() == replicaPort) { - tracer.push("got expected replica removed event"); - sentinelRemoveResolve(event.node); - } else { - tracer.push(`got replica removed event for a different node: ${event.node.port}`); - } - } - }); + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "REPLICA_REMOVE") { + // if (event.node.port.toString() == replicaPort) { + // tracer.push("got expected replica removed event"); + // sentinelRemoveResolve(event.node); + // } else { + // tracer.push(`got replica removed event for a different node: ${event.node.port}`); + // } + // } + // }); - tracer.push(`replicaPort = ${replicaPort} and stopping it`); - await frame.stopNode(replicaPort); - tracer.push("stopped replica and waiting on sentinel removed promise"); - const stoppedNode = await sentinelRemovePromise as RedisNode; - tracer.push("got removed promise"); - assert.equal(stoppedNode.port, Number(replicaPort)); + // tracer.push(`replicaPort = ${replicaPort} and stopping it`); + // await frame.stopNode(replicaPort); + // tracer.push("stopped replica and waiting on sentinel removed promise"); + // const stoppedNode = await sentinelRemovePromise as RedisNode; + // tracer.push("got removed promise"); + // assert.equal(stoppedNode.port, Number(replicaPort)); - let sentinelRestartedResolve; - const sentinelRestartedPromise = new Promise((res) => { - sentinelRestartedResolve = res; - }) + // let sentinelRestartedResolve; + // const sentinelRestartedPromise = new Promise((res) => { + // sentinelRestartedResolve = res; + // }) - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "REPLICA_ADD") { - tracer.push("got replica added event"); - sentinelRestartedResolve(event.node); - } - }); + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "REPLICA_ADD") { + // tracer.push("got replica added event"); + // sentinelRestartedResolve(event.node); + // } + // }); - tracer.push("restarting replica"); - await frame.restartNode(replicaPort); - tracer.push("restarted replica and waiting on restart promise"); - const restartedNode = await sentinelRestartedPromise as RedisNode; - tracer.push("got restarted promise"); - assert.equal(restartedNode.port, Number(replicaPort)); - }) + // tracer.push("restarting replica"); + // await frame.restartNode(replicaPort); + // tracer.push("restarted replica and waiting on restart promise"); + // const restartedNode = await sentinelRestartedPromise as RedisNode; + // tracer.push("got restarted promise"); + // assert.equal(restartedNode.port, Number(replicaPort)); + // }) - it('add a node / new replica', async function () { - this.timeout(30000); + // it('add a node / new replica', async function () { + // this.timeout(30000); - sentinel = frame.getSentinelClient({ scanInterval: 2000, replicaPoolSize: 1 }); - sentinel.setTracer(tracer); - // need to handle errors, as the spawning a new docker node can cause existing connections to time out - sentinel.on('error', err => { }); - await sentinel.connect(); - tracer.push("connected"); + // sentinel = frame.getSentinelClient({ scanInterval: 2000, replicaPoolSize: 1 }); + // sentinel.setTracer(tracer); + // // need to handle errors, as the spawning a new docker node can cause existing connections to time out + // sentinel.on('error', err => { }); + // await sentinel.connect(); + // tracer.push("connected"); - let nodeAddedResolve: (value: RedisNode) => void; - const nodeAddedPromise = new Promise((res) => { - nodeAddedResolve = res as (value: RedisNode) => void; - }); + // let nodeAddedResolve: (value: RedisNode) => void; + // const nodeAddedPromise = new Promise((res) => { + // nodeAddedResolve = res as (value: RedisNode) => void; + // }); - const portSet = new Set(); - for (const port of frame.getAllNodesPort()) { - portSet.add(port); - } + // const portSet = new Set(); + // for (const port of frame.getAllNodesPort()) { + // portSet.add(port); + // } - // "on" and not "once" as due to connection timeouts, can happen multiple times, and want right one - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "REPLICA_ADD") { - if (!portSet.has(event.node.port)) { - tracer.push("got expected replica added event"); - nodeAddedResolve(event.node); - } - } - }); + // // "on" and not "once" as due to connection timeouts, can happen multiple times, and want right one + // sentinel.on('topology-change', (event: RedisSentinelEvent) => { + // tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + // if (event.type === "REPLICA_ADD") { + // if (!portSet.has(event.node.port)) { + // tracer.push("got expected replica added event"); + // nodeAddedResolve(event.node); + // } + // } + // }); - tracer.push("adding node"); - await frame.addNode(); - tracer.push("added node and waiting on added promise"); - await nodeAddedPromise; - }) + // tracer.push("adding node"); + // await frame.addNode(); + // tracer.push("added node and waiting on added promise"); + // await nodeAddedPromise; + // }) }) - describe('Sentinel Factory', function () { + describe.skip('Sentinel Factory', function () { let master: RedisClientType | undefined; let replica: RedisClientType | undefined; diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index 00809e1dd3..cf61be2612 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -404,14 +404,15 @@ export default class RedisSentinel< } } - async use(fn: (sentinelClient: RedisSentinelClientType) => Promise) { + async use(id: string, fn: (sentinelClient: RedisSentinelClientType) => Promise) { const clientInfo = await this._self.#internal.getClientLease(); - + console.log("leased client: %d from %s", clientInfo.id, id) try { return await fn( RedisSentinelClient.create(this._self.#options, this._self.#internal, clientInfo, this._self.#commandOptions) ); } finally { + console.log("released client: %d from %s", clientInfo.id, id) const promise = this._self.#internal.releaseClientLease(clientInfo); if (promise) await promise; } @@ -735,7 +736,7 @@ class RedisSentinelInternal< } const sockOpts = client.options?.socket as TcpNetConnectOpts | undefined; this.#trace("attemping to send command to " + sockOpts?.host + ":" + sockOpts?.port) - + console.log("attempting to send command to %s:%s from %s", sockOpts?.host, sockOpts?.port, clientInfo?.id) try { /* // force testing of READONLY errors