diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 480d7d5140..efa8082090 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -294,9 +294,9 @@ export default class RedisCommandsQueue { } resolve(); }, - reject: () => { + reject: err => { pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1); - reject(); + reject(err); } }); }); @@ -307,11 +307,32 @@ export default class RedisCommandsQueue { return; } - // TODO: acl error on one channel/pattern will reject the whole command - return Promise.all([ - this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubState.listeners.channels.keys()]), - this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubState.listeners.patterns.keys()]) - ]); + this.#pubSubState.subscribed = 0; + + const promises = [], + { channels, patterns } = this.#pubSubState.listeners; + + if (channels.size) { + promises.push( + this.#pushPubSubCommand( + PubSubSubscribeCommands.SUBSCRIBE, + [...channels.keys()] + ) + ); + } + + if (patterns.size) { + promises.push( + this.#pushPubSubCommand( + PubSubSubscribeCommands.PSUBSCRIBE, + [...patterns.keys()] + ) + ); + } + + if (promises.length) { + return Promise.all(promises); + } } getCommandToSend(): RedisCommandArguments | undefined { diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 679c7ae692..63bd9a1b46 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -560,73 +560,104 @@ describe('Client', () => { ); }, GLOBAL.SERVERS.OPEN); - testUtils.testWithClient('PubSub', async publisher => { - function assertStringListener(message: string, channel: string) { - assert.ok(typeof message === 'string'); - assert.ok(typeof channel === 'string'); - } + describe('PubSub', () => { + testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => { + function assertStringListener(message: string, channel: string) { + assert.ok(typeof message === 'string'); + assert.ok(typeof channel === 'string'); + } - function assertBufferListener(message: Buffer, channel: Buffer) { - assert.ok(Buffer.isBuffer(message)); - assert.ok(Buffer.isBuffer(channel)); - } + function assertBufferListener(message: Buffer, channel: Buffer) { + assert.ok(Buffer.isBuffer(message)); + assert.ok(Buffer.isBuffer(channel)); + } - const subscriber = publisher.duplicate(); + const subscriber = publisher.duplicate(); - await subscriber.connect(); + await subscriber.connect(); - try { - const channelListener1 = spy(assertBufferListener), - channelListener2 = spy(assertStringListener), - patternListener = spy(assertStringListener); + try { + const channelListener1 = spy(assertBufferListener), + channelListener2 = spy(assertStringListener), + patternListener = spy(assertStringListener); - await Promise.all([ - subscriber.subscribe('channel', channelListener1, true), - subscriber.subscribe('channel', channelListener2), - subscriber.pSubscribe('channel*', patternListener) - ]); - await Promise.all([ - waitTillBeenCalled(channelListener1), - waitTillBeenCalled(channelListener2), - waitTillBeenCalled(patternListener), - publisher.publish(Buffer.from('channel'), Buffer.from('message')) - ]); + await Promise.all([ + subscriber.subscribe('channel', channelListener1, true), + subscriber.subscribe('channel', channelListener2), + subscriber.pSubscribe('channel*', patternListener) + ]); + await Promise.all([ + waitTillBeenCalled(channelListener1), + waitTillBeenCalled(channelListener2), + waitTillBeenCalled(patternListener), + publisher.publish(Buffer.from('channel'), Buffer.from('message')) + ]); - assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel'))); - assert.ok(channelListener2.calledOnceWithExactly('message', 'channel')); - assert.ok(patternListener.calledOnceWithExactly('message', 'channel')); + assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel'))); + assert.ok(channelListener2.calledOnceWithExactly('message', 'channel')); + assert.ok(patternListener.calledOnceWithExactly('message', 'channel')); - await subscriber.unsubscribe('channel', channelListener1, true); - await Promise.all([ - waitTillBeenCalled(channelListener2), - waitTillBeenCalled(patternListener), - publisher.publish('channel', 'message') - ]); - assert.ok(channelListener1.calledOnce); - assert.ok(channelListener2.calledTwice); - assert.ok(channelListener2.secondCall.calledWithExactly('message', 'channel')); - assert.ok(patternListener.calledTwice); - assert.ok(patternListener.secondCall.calledWithExactly('message', 'channel')); - await subscriber.unsubscribe('channel'); - await Promise.all([ - waitTillBeenCalled(patternListener), - publisher.publish('channel', 'message') - ]); - assert.ok(channelListener1.calledOnce); - assert.ok(channelListener2.calledTwice); - assert.ok(patternListener.calledThrice); - assert.ok(patternListener.thirdCall.calledWithExactly('message', 'channel')); - await subscriber.pUnsubscribe(); - await publisher.publish('channel', 'message'); - assert.ok(channelListener1.calledOnce); - assert.ok(channelListener2.calledTwice); - assert.ok(patternListener.calledThrice); - // should be able to send commands when unsubsribed from all channels (see #1652) - await assert.doesNotReject(subscriber.ping()); - } finally { - await subscriber.disconnect(); - } - }, GLOBAL.SERVERS.OPEN); + await subscriber.unsubscribe('channel', channelListener1, true); + await Promise.all([ + waitTillBeenCalled(channelListener2), + waitTillBeenCalled(patternListener), + publisher.publish('channel', 'message') + ]); + assert.ok(channelListener1.calledOnce); + assert.ok(channelListener2.calledTwice); + assert.ok(channelListener2.secondCall.calledWithExactly('message', 'channel')); + assert.ok(patternListener.calledTwice); + assert.ok(patternListener.secondCall.calledWithExactly('message', 'channel')); + await subscriber.unsubscribe('channel'); + await Promise.all([ + waitTillBeenCalled(patternListener), + publisher.publish('channel', 'message') + ]); + assert.ok(channelListener1.calledOnce); + assert.ok(channelListener2.calledTwice); + assert.ok(patternListener.calledThrice); + assert.ok(patternListener.thirdCall.calledWithExactly('message', 'channel')); + await subscriber.pUnsubscribe(); + await publisher.publish('channel', 'message'); + assert.ok(channelListener1.calledOnce); + assert.ok(channelListener2.calledTwice); + assert.ok(patternListener.calledThrice); + // should be able to send commands when unsubsribed from all channels (see #1652) + await assert.doesNotReject(subscriber.ping()); + } finally { + await subscriber.disconnect(); + } + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('should resubscribe', async publisher => { + const subscriber = publisher.duplicate(); + + await subscriber.connect(); + + try { + const listener = spy(); + await subscriber.subscribe('channel', listener); + + subscriber.on('error', err => { + console.error('subscriber err', err.message); + }); + + await Promise.all([ + once(subscriber, 'error'), + publisher.sendCommand(['CLIENT', 'KILL', 'SKIPME', 'yes']) + ]); + + await once(subscriber, 'ready'); + + await Promise.all([ + waitTillBeenCalled(listener), + publisher.publish('channel', 'message') + ]); + } finally { + await subscriber.disconnect(); + } + }, GLOBAL.SERVERS.OPEN); + }); testUtils.testWithClient('ConnectionTimeoutError', async client => { const promise = assert.rejects(client.connect(), ConnectionTimeoutError),