diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 8806a50a9d..22ea1aba96 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -211,9 +211,7 @@ export default class RedisCommandsQueue { signal.addEventListener('abort', value.abort.listener, { once: true }); } - node = options?.asap ? - this._toWrite.unshift(value) : - this._toWrite.push(value); + node = this._toWrite.add(value, options?.asap); }); } @@ -272,7 +270,7 @@ export default class RedisCommandsQueue { if (command === undefined) return; return new Promise((resolve, reject) => { - (asap ? this._toWrite.unshift : this._toWrite.push)({ + this._toWrite.add({ args: command.args, chainId: undefined, abort: undefined, @@ -287,7 +285,7 @@ export default class RedisCommandsQueue { }, channelsCounter: command.channelsCounter, typeMapping: PUSH_TYPE_MAPPING - }); + }, asap); }); } diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index cfdfba0455..fa3fbe55c4 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -525,222 +525,217 @@ describe('Client', () => { assert.deepEqual(map, results); }, GLOBAL.SERVERS.OPEN); -// describe('PubSub', () => { -// testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => { -// function assertStringListener(message: string, channel: string) { -// assert.equal(typeof message, 'string'); -// assert.equal(typeof channel, 'string'); -// } + describe('PubSub', () => { + testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => { + function assertStringListener(message: string, channel: string) { + assert.equal(typeof message, 'string'); + assert.equal(typeof channel, 'string'); + } -// function assertBufferListener(message: Buffer, channel: Buffer) { -// assert.ok(message instanceof Buffer); -// assert.ok(channel instanceof Buffer); -// } + function assertBufferListener(message: Buffer, channel: Buffer) { + assert.ok(message instanceof Buffer); + assert.ok(channel instanceof Buffer); + } -// const subscriber = publisher.duplicate(); + const subscriber = await publisher.duplicate().connect(); -// await subscriber.connect(); + try { + const channelListener1 = spy(assertBufferListener), + channelListener2 = spy(assertStringListener), + patternListener = spy(assertStringListener); -// try { -// const channelListener1 = spy(assertBufferListener), -// channelListener2 = spy(assertStringListener), -// patternListener = spy(assertStringListener); + await Promise.all([ + subscriber.subscribe('channel', channelListener1, true), + subscriber.subscribe('channel', channelListener2), + subscriber.pSubscribe('channel*', patternListener) + ]); + await Promise.all([ + waitTillBeenCalled(channelListener1), + waitTillBeenCalled(channelListener2), + waitTillBeenCalled(patternListener), + publisher.publish(Buffer.from('channel'), Buffer.from('message')) + ]); + assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel'))); + assert.ok(channelListener2.calledOnceWithExactly('message', 'channel')); + assert.ok(patternListener.calledOnceWithExactly('message', 'channel')); -// await Promise.all([ -// subscriber.subscribe('channel', channelListener1, true), -// subscriber.subscribe('channel', channelListener2), -// subscriber.pSubscribe('channel*', patternListener) -// ]); -// await Promise.all([ -// waitTillBeenCalled(channelListener1), -// waitTillBeenCalled(channelListener2), -// waitTillBeenCalled(patternListener), -// publisher.publish(Buffer.from('channel'), Buffer.from('message')) -// ]); + await subscriber.unsubscribe('channel', channelListener1, true); + await Promise.all([ + waitTillBeenCalled(channelListener2), + waitTillBeenCalled(patternListener), + publisher.publish('channel', 'message') + ]); + assert.ok(channelListener1.calledOnce); + assert.ok(channelListener2.calledTwice); + assert.ok(channelListener2.secondCall.calledWithExactly('message', 'channel')); + assert.ok(patternListener.calledTwice); + assert.ok(patternListener.secondCall.calledWithExactly('message', 'channel')); + await subscriber.unsubscribe('channel'); + await Promise.all([ + waitTillBeenCalled(patternListener), + publisher.publish('channel', 'message') + ]); + assert.ok(channelListener1.calledOnce); + assert.ok(channelListener2.calledTwice); + assert.ok(patternListener.calledThrice); + assert.ok(patternListener.thirdCall.calledWithExactly('message', 'channel')); -// assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel'))); -// assert.ok(channelListener2.calledOnceWithExactly('message', 'channel')); -// assert.ok(patternListener.calledOnceWithExactly('message', 'channel')); + await subscriber.pUnsubscribe(); + await publisher.publish('channel', 'message'); + assert.ok(channelListener1.calledOnce); + assert.ok(channelListener2.calledTwice); + assert.ok(patternListener.calledThrice); -// await subscriber.unsubscribe('channel', channelListener1, true); -// await Promise.all([ -// waitTillBeenCalled(channelListener2), -// waitTillBeenCalled(patternListener), -// publisher.publish('channel', 'message') -// ]); -// assert.ok(channelListener1.calledOnce); -// assert.ok(channelListener2.calledTwice); -// assert.ok(channelListener2.secondCall.calledWithExactly('message', 'channel')); -// assert.ok(patternListener.calledTwice); -// assert.ok(patternListener.secondCall.calledWithExactly('message', 'channel')); -// await subscriber.unsubscribe('channel'); -// await Promise.all([ -// waitTillBeenCalled(patternListener), -// publisher.publish('channel', 'message') -// ]); -// assert.ok(channelListener1.calledOnce); -// assert.ok(channelListener2.calledTwice); -// assert.ok(patternListener.calledThrice); -// assert.ok(patternListener.thirdCall.calledWithExactly('message', 'channel')); -// await subscriber.pUnsubscribe(); -// await publisher.publish('channel', 'message'); -// assert.ok(channelListener1.calledOnce); -// assert.ok(channelListener2.calledTwice); -// assert.ok(patternListener.calledThrice); -// // should be able to send commands when unsubsribed from all channels (see #1652) -// await assert.doesNotReject(subscriber.ping()); -// } finally { -// await subscriber.disconnect(); -// } -// }, GLOBAL.SERVERS.OPEN); + // should be able to send commands when unsubsribed from all channels (see #1652) + await assert.doesNotReject(subscriber.ping()); + } finally { + subscriber.destroy(); + } + }, GLOBAL.SERVERS.OPEN); -// testUtils.testWithClient('should resubscribe', async publisher => { -// const subscriber = publisher.duplicate(); + testUtils.testWithClient('should resubscribe', async publisher => { + const subscriber = await publisher.duplicate().connect(); -// await subscriber.connect(); + try { + const channelListener = spy(); + await subscriber.subscribe('channel', channelListener); -// try { -// const channelListener = spy(); -// await subscriber.subscribe('channel', channelListener); + const patternListener = spy(); + await subscriber.pSubscribe('channe*', patternListener); -// const patternListener = spy(); -// await subscriber.pSubscribe('channe*', patternListener); + await Promise.all([ + once(subscriber, 'error'), + publisher.clientKill({ + filter: 'SKIPME', + skipMe: true + }) + ]); -// await Promise.all([ -// once(subscriber, 'error'), -// publisher.clientKill({ -// filter: ClientKillFilters.SKIP_ME, -// skipMe: true -// }) -// ]); + await once(subscriber, 'ready'); -// await once(subscriber, 'ready'); + await Promise.all([ + waitTillBeenCalled(channelListener), + waitTillBeenCalled(patternListener), + publisher.publish('channel', 'message') + ]); + } finally { + subscriber.destroy(); + } + }, GLOBAL.SERVERS.OPEN); -// await Promise.all([ -// waitTillBeenCalled(channelListener), -// waitTillBeenCalled(patternListener), -// publisher.publish('channel', 'message') -// ]); -// } finally { -// await subscriber.disconnect(); -// } -// }, GLOBAL.SERVERS.OPEN); + testUtils.testWithClient('should not fail when message arrives right after subscribe', async publisher => { + const subscriber = await publisher.duplicate().connect(); -// testUtils.testWithClient('should not fail when message arrives right after subscribe', async publisher => { -// const subscriber = publisher.duplicate(); + try { + await assert.doesNotReject(Promise.all([ + subscriber.subscribe('channel', () => { + // noop + }), + publisher.publish('channel', 'message') + ])); + } finally { + subscriber.destroy(); + } + }, GLOBAL.SERVERS.OPEN); -// await subscriber.connect(); + testUtils.testWithClient('should be able to quit in PubSub mode', async client => { + await client.subscribe('channel', () => { + // noop + }); -// try { -// await assert.doesNotReject(Promise.all([ -// subscriber.subscribe('channel', () => { -// // noop -// }), -// publisher.publish('channel', 'message') -// ])); -// } finally { -// await subscriber.disconnect(); -// } -// }, GLOBAL.SERVERS.OPEN); + await assert.doesNotReject(client.quit()); -// testUtils.testWithClient('should be able to quit in PubSub mode', async client => { -// await client.subscribe('channel', () => { -// // noop -// }); + assert.equal(client.isOpen, false); + }, GLOBAL.SERVERS.OPEN); + }); -// await assert.doesNotReject(client.quit()); + testUtils.testWithClient('ConnectionTimeoutError', async client => { + const promise = assert.rejects(client.connect(), ConnectionTimeoutError), + start = process.hrtime.bigint(); -// assert.equal(client.isOpen, false); -// }, GLOBAL.SERVERS.OPEN); -// }); + while (process.hrtime.bigint() - start < 1_000_000) { + // block the event loop for 1ms, to make sure the connection will timeout + } -// testUtils.testWithClient('ConnectionTimeoutError', async client => { -// const promise = assert.rejects(client.connect(), ConnectionTimeoutError), -// start = process.hrtime.bigint(); + await promise; + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + socket: { + connectTimeout: 1 + } + }, + disableClientSetup: true + }); -// while (process.hrtime.bigint() - start < 1_000_000) { -// // block the event loop for 1ms, to make sure the connection will timeout -// } + testUtils.testWithClient('client.quit', async client => { + await client.connect(); -// await promise; -// }, { -// ...GLOBAL.SERVERS.OPEN, -// clientOptions: { -// socket: { -// connectTimeout: 1 -// } -// }, -// disableClientSetup: true -// }); + const pingPromise = client.ping(), + quitPromise = client.quit(); + assert.equal(client.isOpen, false); -// testUtils.testWithClient('client.quit', async client => { -// await client.connect(); + const [ping, quit] = await Promise.all([ + pingPromise, + quitPromise, + assert.rejects(client.ping(), ClientClosedError) + ]); -// const pingPromise = client.ping(), -// quitPromise = client.quit(); -// assert.equal(client.isOpen, false); + assert.equal(ping, 'PONG'); + assert.equal(quit, 'OK'); + }, { + ...GLOBAL.SERVERS.OPEN, + disableClientSetup: true + }); -// const [ping, quit] = await Promise.all([ -// pingPromise, -// quitPromise, -// assert.rejects(client.ping(), ClientClosedError) -// ]); + testUtils.testWithClient('client.disconnect', async client => { + const pingPromise = client.ping(), + disconnectPromise = client.disconnect(); + assert.equal(client.isOpen, false); + await Promise.all([ + assert.rejects(pingPromise, DisconnectsClientError), + assert.doesNotReject(disconnectPromise), + assert.rejects(client.ping(), ClientClosedError) + ]); + }, GLOBAL.SERVERS.OPEN); -// assert.equal(ping, 'PONG'); -// assert.equal(quit, 'OK'); -// }, { -// ...GLOBAL.SERVERS.OPEN, -// disableClientSetup: true -// }); + testUtils.testWithClient('should be able to connect after disconnect (see #1801)', async client => { + await client.disconnect(); + await client.connect(); + }, GLOBAL.SERVERS.OPEN); -// testUtils.testWithClient('client.disconnect', async client => { -// const pingPromise = client.ping(), -// disconnectPromise = client.disconnect(); -// assert.equal(client.isOpen, false); -// await Promise.all([ -// assert.rejects(pingPromise, DisconnectsClientError), -// assert.doesNotReject(disconnectPromise), -// assert.rejects(client.ping(), ClientClosedError) -// ]); -// }, GLOBAL.SERVERS.OPEN); + testUtils.testWithClient('should be able to use ref and unref', client => { + client.unref(); + client.ref(); + }, GLOBAL.SERVERS.OPEN); -// testUtils.testWithClient('should be able to connect after disconnect (see #1801)', async client => { -// await client.disconnect(); -// await client.connect(); -// }, GLOBAL.SERVERS.OPEN); + testUtils.testWithClient('pingInterval', async client => { + assert.deepEqual( + await once(client, 'ping-interval'), + ['PONG'] + ); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + pingInterval: 1 + } + }); -// testUtils.testWithClient('should be able to use ref and unref', client => { -// client.unref(); -// client.ref(); -// }, GLOBAL.SERVERS.OPEN); - -// testUtils.testWithClient('pingInterval', async client => { -// assert.deepEqual( -// await once(client, 'ping-interval'), -// ['PONG'] -// ); -// }, { -// ...GLOBAL.SERVERS.OPEN, -// clientOptions: { -// pingInterval: 1 -// } -// }); - -// testUtils.testWithClient('should reject commands in connect phase when `disableOfflineQueue`', async client => { -// const connectPromise = client.connect(); -// await assert.rejects( -// client.ping(), -// ClientOfflineError -// ); -// await connectPromise; -// await client.disconnect(); -// }, { -// ...GLOBAL.SERVERS.OPEN, -// clientOptions: { -// disableOfflineQueue: true -// }, -// disableClientSetup: true -// }); + testUtils.testWithClient('should reject commands in connect phase when `disableOfflineQueue`', async client => { + const connectPromise = client.connect(); + await assert.rejects( + client.ping(), + ClientOfflineError + ); + await connectPromise; + await client.disconnect(); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + disableOfflineQueue: true + }, + disableClientSetup: true + }); describe('MONITOR', () => { testUtils.testWithClient('should be able to monitor commands', async client => { diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index c2f4952738..7aed43cfd1 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -59,6 +59,12 @@ export class DoublyLinkedList { }; } + add(value: T, prepend = false) { + return prepend ? + this.unshift(value) : + this.push(value); + } + shift() { if (this._head === undefined) return undefined;