1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00
Files
node-redis/packages/client/lib/client/pub-sub.ts
Leibale Eidelman 3b1bad2296 Add support for sharded PubSub (#2373)
* refactor pubsub, add support for sharded pub sub

* run tests in redis 7 only, fix PUBSUB SHARDCHANNELS test

* add some comments and fix some bugs

* PubSubType, not PubSubTypes 🤦‍♂️

* remove test.txt

* fix some bugs, add tests

* add some tests

* fix #2345 - allow PING in PubSub mode (remove client side validation)

* remove .only

* revert changes in cluster/index.ts

* fix tests minimum version

* handle server sunsubscribe

* add 'sharded-channel-moved' event to docs, improve the events section in the main README (fix #2302)

* exit "resubscribe" if pubsub not active

* Update commands-queue.ts

* Release client@1.5.0-rc.0

* WIP

* use `node:util` instead of `node:util/types` (to support node 14)

* run PubSub resharding test with Redis 7+

* fix inconsistency in live resharding test

* add some tests

* fix iterateAllNodes when starting from a replica

* fix iterateAllNodes random

* fix slotNodesIterator

* fix slotNodesIterator

* clear pubSubNode when node in use

* wait for all nodes cluster state to be ok before testing

* `cluster.minimizeConections` tests

* `client.reconnectStrategry = false | 0` tests

* sharded pubsub + cluster 🎉

* add minimum version to sharded pubsub tests

* add cluster sharded pubsub live reshard test, use stable dockers for tests, make sure to close pubsub clients when a node disconnects from the cluster

* fix "ssubscribe & sunsubscribe" test

* lock search docker to 2.4.9

* change numberOfMasters default to 2

* use edge for bloom

* add tests

* add back getMasters and getSlotMaster as deprecated functions

* add some tests

* fix reconnect strategy + docs

* sharded pubsub docs

* Update pub-sub.md

* some jsdoc, docs, cluster topology test

* clean pub-sub docs

Co-authored-by: Simon Prickett <simon@redislabs.com>

* reconnect startegy docs and bug fix

Co-authored-by: Simon Prickett <simon@redislabs.com>

* refine jsdoc and some docs

Co-authored-by: Simon Prickett <simon@redislabs.com>

* I'm stupid

* fix cluster topology test

* fix cluster topology test

* Update README.md

* Update clustering.md

* Update pub-sub.md

Co-authored-by: Simon Prickett <simon@redislabs.com>
2023-01-25 11:00:39 -05:00

409 lines
12 KiB
TypeScript

import { RedisCommandArgument } from "../commands";
export enum PubSubType {
CHANNELS = 'CHANNELS',
PATTERNS = 'PATTERNS',
SHARDED = 'SHARDED'
}
const COMMANDS = {
[PubSubType.CHANNELS]: {
subscribe: Buffer.from('subscribe'),
unsubscribe: Buffer.from('unsubscribe'),
message: Buffer.from('message')
},
[PubSubType.PATTERNS]: {
subscribe: Buffer.from('psubscribe'),
unsubscribe: Buffer.from('punsubscribe'),
message: Buffer.from('pmessage')
},
[PubSubType.SHARDED]: {
subscribe: Buffer.from('ssubscribe'),
unsubscribe: Buffer.from('sunsubscribe'),
message: Buffer.from('smessage')
}
};
export type PubSubListener<
RETURN_BUFFERS extends boolean = false
> = <T extends RETURN_BUFFERS extends true ? Buffer : string>(message: T, channel: T) => unknown;
export interface ChannelListeners {
unsubscribing: boolean;
buffers: Set<PubSubListener<true>>;
strings: Set<PubSubListener<false>>;
}
export type PubSubTypeListeners = Map<string, ChannelListeners>;
type Listeners = Record<PubSubType, PubSubTypeListeners>;
export type PubSubCommand = ReturnType<
typeof PubSub.prototype.subscribe |
typeof PubSub.prototype.unsubscribe |
typeof PubSub.prototype.extendTypeListeners
>;
export class PubSub {
static isStatusReply(reply: Array<Buffer>): boolean {
return (
COMMANDS[PubSubType.CHANNELS].subscribe.equals(reply[0]) ||
COMMANDS[PubSubType.CHANNELS].unsubscribe.equals(reply[0]) ||
COMMANDS[PubSubType.PATTERNS].subscribe.equals(reply[0]) ||
COMMANDS[PubSubType.PATTERNS].unsubscribe.equals(reply[0]) ||
COMMANDS[PubSubType.SHARDED].subscribe.equals(reply[0])
);
}
static isShardedUnsubscribe(reply: Array<Buffer>): boolean {
return COMMANDS[PubSubType.SHARDED].unsubscribe.equals(reply[0]);
}
static #channelsArray(channels: string | Array<string>) {
return (Array.isArray(channels) ? channels : [channels]);
}
static #listenersSet<T extends boolean>(
listeners: ChannelListeners,
returnBuffers?: T
) {
return (returnBuffers ? listeners.buffers : listeners.strings);
}
#subscribing = 0;
#isActive = false;
get isActive() {
return this.#isActive;
}
#listeners: Listeners = {
[PubSubType.CHANNELS]: new Map(),
[PubSubType.PATTERNS]: new Map(),
[PubSubType.SHARDED]: new Map()
};
subscribe<T extends boolean>(
type: PubSubType,
channels: string | Array<string>,
listener: PubSubListener<T>,
returnBuffers?: T
) {
const args: Array<RedisCommandArgument> = [COMMANDS[type].subscribe],
channelsArray = PubSub.#channelsArray(channels);
for (const channel of channelsArray) {
let channelListeners = this.#listeners[type].get(channel);
if (!channelListeners || channelListeners.unsubscribing) {
args.push(channel);
}
}
if (args.length === 1) {
// all channels are already subscribed, add listeners without issuing a command
for (const channel of channelsArray) {
PubSub.#listenersSet(
this.#listeners[type].get(channel)!,
returnBuffers
).add(listener);
}
return;
}
this.#isActive = true;
this.#subscribing++;
return {
args,
channelsCounter: args.length - 1,
resolve: () => {
this.#subscribing--;
for (const channel of channelsArray) {
let listeners = this.#listeners[type].get(channel);
if (!listeners) {
listeners = {
unsubscribing: false,
buffers: new Set(),
strings: new Set()
};
this.#listeners[type].set(channel, listeners);
}
PubSub.#listenersSet(listeners, returnBuffers).add(listener);
}
},
reject: () => {
this.#subscribing--;
this.#updateIsActive();
}
};
}
extendChannelListeners(
type: PubSubType,
channel: string,
listeners: ChannelListeners
) {
if (!this.#extendChannelListeners(type, channel, listeners)) return;
this.#isActive = true;
this.#subscribing++;
return {
args: [
COMMANDS[type].subscribe,
channel
],
channelsCounter: 1,
resolve: () => this.#subscribing--,
reject: () => {
this.#subscribing--;
this.#updateIsActive();
}
};
}
#extendChannelListeners(
type: PubSubType,
channel: string,
listeners: ChannelListeners
) {
const existingListeners = this.#listeners[type].get(channel);
if (!existingListeners) {
this.#listeners[type].set(channel, listeners);
return true;
}
for (const listener of listeners.buffers) {
existingListeners.buffers.add(listener);
}
for (const listener of listeners.strings) {
existingListeners.strings.add(listener);
}
return false;
}
extendTypeListeners(type: PubSubType, listeners: PubSubTypeListeners) {
const args: Array<RedisCommandArgument> = [COMMANDS[type].subscribe];
for (const [channel, channelListeners] of listeners) {
if (this.#extendChannelListeners(type, channel, channelListeners)) {
args.push(channel);
}
}
if (args.length === 1) return;
this.#isActive = true;
this.#subscribing++;
return {
args,
channelsCounter: args.length - 1,
resolve: () => this.#subscribing--,
reject: () => {
this.#subscribing--;
this.#updateIsActive();
}
};
}
unsubscribe<T extends boolean>(
type: PubSubType,
channels?: string | Array<string>,
listener?: PubSubListener<T>,
returnBuffers?: T
) {
const listeners = this.#listeners[type];
if (!channels) {
return this.#unsubscribeCommand(
[COMMANDS[type].unsubscribe],
// cannot use `this.#subscribed` because there might be some `SUBSCRIBE` commands in the queue
// cannot use `this.#subscribed + this.#subscribing` because some `SUBSCRIBE` commands might fail
NaN,
() => listeners.clear()
);
}
const channelsArray = PubSub.#channelsArray(channels);
if (!listener) {
return this.#unsubscribeCommand(
[COMMANDS[type].unsubscribe, ...channelsArray],
channelsArray.length,
() => {
for (const channel of channelsArray) {
listeners.delete(channel);
}
}
);
}
const args: Array<RedisCommandArgument> = [COMMANDS[type].unsubscribe];
for (const channel of channelsArray) {
const sets = listeners.get(channel);
if (sets) {
let current,
other;
if (returnBuffers) {
current = sets.buffers;
other = sets.strings;
} else {
current = sets.strings;
other = sets.buffers;
}
const currentSize = current.has(listener) ? current.size - 1 : current.size;
if (currentSize !== 0 || other.size !== 0) continue;
sets.unsubscribing = true;
}
args.push(channel);
}
if (args.length === 1) {
// all channels has other listeners,
// delete the listeners without issuing a command
for (const channel of channelsArray) {
PubSub.#listenersSet(
listeners.get(channel)!,
returnBuffers
).delete(listener);
}
return;
}
return this.#unsubscribeCommand(
args,
args.length - 1,
() => {
for (const channel of channelsArray) {
const sets = listeners.get(channel);
if (!sets) continue;
(returnBuffers ? sets.buffers : sets.strings).delete(listener);
if (sets.buffers.size === 0 && sets.strings.size === 0) {
listeners.delete(channel);
}
}
}
);
}
#unsubscribeCommand(
args: Array<RedisCommandArgument>,
channelsCounter: number,
removeListeners: () => void
) {
return {
args,
channelsCounter,
resolve: () => {
removeListeners();
this.#updateIsActive();
},
reject: undefined // use the same structure as `subscribe`
};
}
#updateIsActive() {
this.#isActive = (
this.#listeners[PubSubType.CHANNELS].size !== 0 ||
this.#listeners[PubSubType.PATTERNS].size !== 0 ||
this.#listeners[PubSubType.CHANNELS].size !== 0 ||
this.#subscribing !== 0
);
}
reset() {
this.#isActive = false;
this.#subscribing = 0;
}
resubscribe(): Array<PubSubCommand> {
const commands = [];
for (const [type, listeners] of Object.entries(this.#listeners)) {
if (!listeners.size) continue;
this.#isActive = true;
this.#subscribing++;
const callback = () => this.#subscribing--;
commands.push({
args: [
COMMANDS[type as PubSubType].subscribe,
...listeners.keys()
],
channelsCounter: listeners.size,
resolve: callback,
reject: callback
});
}
return commands;
}
handleMessageReply(reply: Array<Buffer>): boolean {
if (COMMANDS[PubSubType.CHANNELS].message.equals(reply[0])) {
this.#emitPubSubMessage(
PubSubType.CHANNELS,
reply[2],
reply[1]
);
return true;
} else if (COMMANDS[PubSubType.PATTERNS].message.equals(reply[0])) {
this.#emitPubSubMessage(
PubSubType.PATTERNS,
reply[3],
reply[2],
reply[1]
);
return true;
} else if (COMMANDS[PubSubType.SHARDED].message.equals(reply[0])) {
this.#emitPubSubMessage(
PubSubType.SHARDED,
reply[2],
reply[1]
);
return true;
}
return false;
}
removeShardedListeners(channel: string): ChannelListeners {
const listeners = this.#listeners[PubSubType.SHARDED].get(channel)!;
this.#listeners[PubSubType.SHARDED].delete(channel);
this.#updateIsActive();
return listeners;
}
#emitPubSubMessage(
type: PubSubType,
message: Buffer,
channel: Buffer,
pattern?: Buffer
): void {
const keyString = (pattern ?? channel).toString(),
listeners = this.#listeners[type].get(keyString);
if (!listeners) return;
for (const listener of listeners.buffers) {
listener(message, channel);
}
if (!listeners.strings.size) return;
const channelString = pattern ? channel.toString() : keyString,
messageString = channelString === '__redis__:invalidate' ?
// https://github.com/redis/redis/pull/7469
// https://github.com/redis/redis/issues/7463
(message === null ? null : (message as any as Array<Buffer>).map(x => x.toString())) as any :
message.toString();
for (const listener of listeners.strings) {
listener(messageString, channelString);
}
}
getTypeListeners(type: PubSubType): PubSubTypeListeners {
return this.#listeners[type];
}
}