1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00

fix #1764 - fix PubSub resubscribe

This commit is contained in:
leibale
2021-12-06 20:29:59 -05:00
parent 7202f3582f
commit 82920aef0b
2 changed files with 119 additions and 67 deletions

View File

@@ -294,9 +294,9 @@ export default class RedisCommandsQueue {
} }
resolve(); resolve();
}, },
reject: () => { reject: err => {
pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1); pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1);
reject(); reject(err);
} }
}); });
}); });
@@ -307,11 +307,32 @@ export default class RedisCommandsQueue {
return; return;
} }
// TODO: acl error on one channel/pattern will reject the whole command this.#pubSubState.subscribed = 0;
return Promise.all([
this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubState.listeners.channels.keys()]), const promises = [],
this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubState.listeners.patterns.keys()]) { channels, patterns } = this.#pubSubState.listeners;
]);
if (channels.size) {
promises.push(
this.#pushPubSubCommand(
PubSubSubscribeCommands.SUBSCRIBE,
[...channels.keys()]
)
);
}
if (patterns.size) {
promises.push(
this.#pushPubSubCommand(
PubSubSubscribeCommands.PSUBSCRIBE,
[...patterns.keys()]
)
);
}
if (promises.length) {
return Promise.all(promises);
}
} }
getCommandToSend(): RedisCommandArguments | undefined { getCommandToSend(): RedisCommandArguments | undefined {

View File

@@ -560,7 +560,8 @@ describe('Client', () => {
); );
}, GLOBAL.SERVERS.OPEN); }, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('PubSub', async publisher => { describe('PubSub', () => {
testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => {
function assertStringListener(message: string, channel: string) { function assertStringListener(message: string, channel: string) {
assert.ok(typeof message === 'string'); assert.ok(typeof message === 'string');
assert.ok(typeof channel === 'string'); assert.ok(typeof channel === 'string');
@@ -628,6 +629,36 @@ describe('Client', () => {
} }
}, GLOBAL.SERVERS.OPEN); }, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('should resubscribe', async publisher => {
const subscriber = publisher.duplicate();
await subscriber.connect();
try {
const listener = spy();
await subscriber.subscribe('channel', listener);
subscriber.on('error', err => {
console.error('subscriber err', err.message);
});
await Promise.all([
once(subscriber, 'error'),
publisher.sendCommand(['CLIENT', 'KILL', 'SKIPME', 'yes'])
]);
await once(subscriber, 'ready');
await Promise.all([
waitTillBeenCalled(listener),
publisher.publish('channel', 'message')
]);
} finally {
await subscriber.disconnect();
}
}, GLOBAL.SERVERS.OPEN);
});
testUtils.testWithClient('ConnectionTimeoutError', async client => { testUtils.testWithClient('ConnectionTimeoutError', async client => {
const promise = assert.rejects(client.connect(), ConnectionTimeoutError), const promise = assert.rejects(client.connect(), ConnectionTimeoutError),
start = process.hrtime.bigint(); start = process.hrtime.bigint();