You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-04 15:02:09 +03:00
* refactor pubsub, add support for sharded pub sub * run tests in redis 7 only, fix PUBSUB SHARDCHANNELS test * add some comments and fix some bugs * PubSubType, not PubSubTypes 🤦♂️ * remove test.txt * fix some bugs, add tests * add some tests * fix #2345 - allow PING in PubSub mode (remove client side validation) * remove .only * revert changes in cluster/index.ts * fix tests minimum version * handle server sunsubscribe * add 'sharded-channel-moved' event to docs, improve the events section in the main README (fix #2302) * exit "resubscribe" if pubsub not active * Update commands-queue.ts * Release client@1.5.0-rc.0 * WIP * use `node:util` instead of `node:util/types` (to support node 14) * run PubSub resharding test with Redis 7+ * fix inconsistency in live resharding test * add some tests * fix iterateAllNodes when starting from a replica * fix iterateAllNodes random * fix slotNodesIterator * fix slotNodesIterator * clear pubSubNode when node in use * wait for all nodes cluster state to be ok before testing * `cluster.minimizeConections` tests * `client.reconnectStrategry = false | 0` tests * sharded pubsub + cluster 🎉 * add minimum version to sharded pubsub tests * add cluster sharded pubsub live reshard test, use stable dockers for tests, make sure to close pubsub clients when a node disconnects from the cluster * fix "ssubscribe & sunsubscribe" test * lock search docker to 2.4.9 * change numberOfMasters default to 2 * use edge for bloom * add tests * add back getMasters and getSlotMaster as deprecated functions * add some tests * fix reconnect strategy + docs * sharded pubsub docs * Update pub-sub.md * some jsdoc, docs, cluster topology test * clean pub-sub docs Co-authored-by: Simon Prickett <simon@redislabs.com> * reconnect startegy docs and bug fix Co-authored-by: Simon Prickett <simon@redislabs.com> * refine jsdoc and some docs Co-authored-by: Simon Prickett <simon@redislabs.com> * I'm stupid * fix cluster topology test * fix cluster topology test * Update README.md * Update clustering.md * Update pub-sub.md Co-authored-by: Simon Prickett <simon@redislabs.com>
409 lines
12 KiB
TypeScript
409 lines
12 KiB
TypeScript
import { RedisCommandArgument } from "../commands";
|
|
|
|
export enum PubSubType {
|
|
CHANNELS = 'CHANNELS',
|
|
PATTERNS = 'PATTERNS',
|
|
SHARDED = 'SHARDED'
|
|
}
|
|
|
|
const COMMANDS = {
|
|
[PubSubType.CHANNELS]: {
|
|
subscribe: Buffer.from('subscribe'),
|
|
unsubscribe: Buffer.from('unsubscribe'),
|
|
message: Buffer.from('message')
|
|
},
|
|
[PubSubType.PATTERNS]: {
|
|
subscribe: Buffer.from('psubscribe'),
|
|
unsubscribe: Buffer.from('punsubscribe'),
|
|
message: Buffer.from('pmessage')
|
|
},
|
|
[PubSubType.SHARDED]: {
|
|
subscribe: Buffer.from('ssubscribe'),
|
|
unsubscribe: Buffer.from('sunsubscribe'),
|
|
message: Buffer.from('smessage')
|
|
}
|
|
};
|
|
|
|
export type PubSubListener<
|
|
RETURN_BUFFERS extends boolean = false
|
|
> = <T extends RETURN_BUFFERS extends true ? Buffer : string>(message: T, channel: T) => unknown;
|
|
|
|
export interface ChannelListeners {
|
|
unsubscribing: boolean;
|
|
buffers: Set<PubSubListener<true>>;
|
|
strings: Set<PubSubListener<false>>;
|
|
}
|
|
|
|
export type PubSubTypeListeners = Map<string, ChannelListeners>;
|
|
|
|
type Listeners = Record<PubSubType, PubSubTypeListeners>;
|
|
|
|
export type PubSubCommand = ReturnType<
|
|
typeof PubSub.prototype.subscribe |
|
|
typeof PubSub.prototype.unsubscribe |
|
|
typeof PubSub.prototype.extendTypeListeners
|
|
>;
|
|
|
|
export class PubSub {
|
|
static isStatusReply(reply: Array<Buffer>): boolean {
|
|
return (
|
|
COMMANDS[PubSubType.CHANNELS].subscribe.equals(reply[0]) ||
|
|
COMMANDS[PubSubType.CHANNELS].unsubscribe.equals(reply[0]) ||
|
|
COMMANDS[PubSubType.PATTERNS].subscribe.equals(reply[0]) ||
|
|
COMMANDS[PubSubType.PATTERNS].unsubscribe.equals(reply[0]) ||
|
|
COMMANDS[PubSubType.SHARDED].subscribe.equals(reply[0])
|
|
);
|
|
}
|
|
|
|
static isShardedUnsubscribe(reply: Array<Buffer>): boolean {
|
|
return COMMANDS[PubSubType.SHARDED].unsubscribe.equals(reply[0]);
|
|
}
|
|
|
|
static #channelsArray(channels: string | Array<string>) {
|
|
return (Array.isArray(channels) ? channels : [channels]);
|
|
}
|
|
|
|
static #listenersSet<T extends boolean>(
|
|
listeners: ChannelListeners,
|
|
returnBuffers?: T
|
|
) {
|
|
return (returnBuffers ? listeners.buffers : listeners.strings);
|
|
}
|
|
|
|
#subscribing = 0;
|
|
|
|
#isActive = false;
|
|
|
|
get isActive() {
|
|
return this.#isActive;
|
|
}
|
|
|
|
#listeners: Listeners = {
|
|
[PubSubType.CHANNELS]: new Map(),
|
|
[PubSubType.PATTERNS]: new Map(),
|
|
[PubSubType.SHARDED]: new Map()
|
|
};
|
|
|
|
subscribe<T extends boolean>(
|
|
type: PubSubType,
|
|
channels: string | Array<string>,
|
|
listener: PubSubListener<T>,
|
|
returnBuffers?: T
|
|
) {
|
|
const args: Array<RedisCommandArgument> = [COMMANDS[type].subscribe],
|
|
channelsArray = PubSub.#channelsArray(channels);
|
|
for (const channel of channelsArray) {
|
|
let channelListeners = this.#listeners[type].get(channel);
|
|
if (!channelListeners || channelListeners.unsubscribing) {
|
|
args.push(channel);
|
|
}
|
|
}
|
|
|
|
if (args.length === 1) {
|
|
// all channels are already subscribed, add listeners without issuing a command
|
|
for (const channel of channelsArray) {
|
|
PubSub.#listenersSet(
|
|
this.#listeners[type].get(channel)!,
|
|
returnBuffers
|
|
).add(listener);
|
|
}
|
|
return;
|
|
}
|
|
|
|
this.#isActive = true;
|
|
this.#subscribing++;
|
|
return {
|
|
args,
|
|
channelsCounter: args.length - 1,
|
|
resolve: () => {
|
|
this.#subscribing--;
|
|
for (const channel of channelsArray) {
|
|
let listeners = this.#listeners[type].get(channel);
|
|
if (!listeners) {
|
|
listeners = {
|
|
unsubscribing: false,
|
|
buffers: new Set(),
|
|
strings: new Set()
|
|
};
|
|
this.#listeners[type].set(channel, listeners);
|
|
}
|
|
|
|
PubSub.#listenersSet(listeners, returnBuffers).add(listener);
|
|
}
|
|
},
|
|
reject: () => {
|
|
this.#subscribing--;
|
|
this.#updateIsActive();
|
|
}
|
|
};
|
|
}
|
|
|
|
extendChannelListeners(
|
|
type: PubSubType,
|
|
channel: string,
|
|
listeners: ChannelListeners
|
|
) {
|
|
if (!this.#extendChannelListeners(type, channel, listeners)) return;
|
|
|
|
this.#isActive = true;
|
|
this.#subscribing++;
|
|
return {
|
|
args: [
|
|
COMMANDS[type].subscribe,
|
|
channel
|
|
],
|
|
channelsCounter: 1,
|
|
resolve: () => this.#subscribing--,
|
|
reject: () => {
|
|
this.#subscribing--;
|
|
this.#updateIsActive();
|
|
}
|
|
};
|
|
}
|
|
|
|
#extendChannelListeners(
|
|
type: PubSubType,
|
|
channel: string,
|
|
listeners: ChannelListeners
|
|
) {
|
|
const existingListeners = this.#listeners[type].get(channel);
|
|
if (!existingListeners) {
|
|
this.#listeners[type].set(channel, listeners);
|
|
return true;
|
|
}
|
|
|
|
for (const listener of listeners.buffers) {
|
|
existingListeners.buffers.add(listener);
|
|
}
|
|
|
|
for (const listener of listeners.strings) {
|
|
existingListeners.strings.add(listener);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
extendTypeListeners(type: PubSubType, listeners: PubSubTypeListeners) {
|
|
const args: Array<RedisCommandArgument> = [COMMANDS[type].subscribe];
|
|
for (const [channel, channelListeners] of listeners) {
|
|
if (this.#extendChannelListeners(type, channel, channelListeners)) {
|
|
args.push(channel);
|
|
}
|
|
}
|
|
|
|
if (args.length === 1) return;
|
|
|
|
this.#isActive = true;
|
|
this.#subscribing++;
|
|
return {
|
|
args,
|
|
channelsCounter: args.length - 1,
|
|
resolve: () => this.#subscribing--,
|
|
reject: () => {
|
|
this.#subscribing--;
|
|
this.#updateIsActive();
|
|
}
|
|
};
|
|
}
|
|
|
|
unsubscribe<T extends boolean>(
|
|
type: PubSubType,
|
|
channels?: string | Array<string>,
|
|
listener?: PubSubListener<T>,
|
|
returnBuffers?: T
|
|
) {
|
|
const listeners = this.#listeners[type];
|
|
if (!channels) {
|
|
return this.#unsubscribeCommand(
|
|
[COMMANDS[type].unsubscribe],
|
|
// cannot use `this.#subscribed` because there might be some `SUBSCRIBE` commands in the queue
|
|
// cannot use `this.#subscribed + this.#subscribing` because some `SUBSCRIBE` commands might fail
|
|
NaN,
|
|
() => listeners.clear()
|
|
);
|
|
}
|
|
|
|
const channelsArray = PubSub.#channelsArray(channels);
|
|
if (!listener) {
|
|
return this.#unsubscribeCommand(
|
|
[COMMANDS[type].unsubscribe, ...channelsArray],
|
|
channelsArray.length,
|
|
() => {
|
|
for (const channel of channelsArray) {
|
|
listeners.delete(channel);
|
|
}
|
|
}
|
|
);
|
|
}
|
|
|
|
const args: Array<RedisCommandArgument> = [COMMANDS[type].unsubscribe];
|
|
for (const channel of channelsArray) {
|
|
const sets = listeners.get(channel);
|
|
if (sets) {
|
|
let current,
|
|
other;
|
|
if (returnBuffers) {
|
|
current = sets.buffers;
|
|
other = sets.strings;
|
|
} else {
|
|
current = sets.strings;
|
|
other = sets.buffers;
|
|
}
|
|
|
|
const currentSize = current.has(listener) ? current.size - 1 : current.size;
|
|
if (currentSize !== 0 || other.size !== 0) continue;
|
|
sets.unsubscribing = true;
|
|
}
|
|
|
|
args.push(channel);
|
|
}
|
|
|
|
if (args.length === 1) {
|
|
// all channels has other listeners,
|
|
// delete the listeners without issuing a command
|
|
for (const channel of channelsArray) {
|
|
PubSub.#listenersSet(
|
|
listeners.get(channel)!,
|
|
returnBuffers
|
|
).delete(listener);
|
|
}
|
|
return;
|
|
}
|
|
|
|
return this.#unsubscribeCommand(
|
|
args,
|
|
args.length - 1,
|
|
() => {
|
|
for (const channel of channelsArray) {
|
|
const sets = listeners.get(channel);
|
|
if (!sets) continue;
|
|
|
|
(returnBuffers ? sets.buffers : sets.strings).delete(listener);
|
|
if (sets.buffers.size === 0 && sets.strings.size === 0) {
|
|
listeners.delete(channel);
|
|
}
|
|
}
|
|
}
|
|
);
|
|
}
|
|
|
|
#unsubscribeCommand(
|
|
args: Array<RedisCommandArgument>,
|
|
channelsCounter: number,
|
|
removeListeners: () => void
|
|
) {
|
|
return {
|
|
args,
|
|
channelsCounter,
|
|
resolve: () => {
|
|
removeListeners();
|
|
this.#updateIsActive();
|
|
},
|
|
reject: undefined // use the same structure as `subscribe`
|
|
};
|
|
}
|
|
|
|
#updateIsActive() {
|
|
this.#isActive = (
|
|
this.#listeners[PubSubType.CHANNELS].size !== 0 ||
|
|
this.#listeners[PubSubType.PATTERNS].size !== 0 ||
|
|
this.#listeners[PubSubType.CHANNELS].size !== 0 ||
|
|
this.#subscribing !== 0
|
|
);
|
|
}
|
|
|
|
reset() {
|
|
this.#isActive = false;
|
|
this.#subscribing = 0;
|
|
}
|
|
|
|
resubscribe(): Array<PubSubCommand> {
|
|
const commands = [];
|
|
for (const [type, listeners] of Object.entries(this.#listeners)) {
|
|
if (!listeners.size) continue;
|
|
|
|
this.#isActive = true;
|
|
this.#subscribing++;
|
|
const callback = () => this.#subscribing--;
|
|
commands.push({
|
|
args: [
|
|
COMMANDS[type as PubSubType].subscribe,
|
|
...listeners.keys()
|
|
],
|
|
channelsCounter: listeners.size,
|
|
resolve: callback,
|
|
reject: callback
|
|
});
|
|
}
|
|
|
|
return commands;
|
|
}
|
|
|
|
handleMessageReply(reply: Array<Buffer>): boolean {
|
|
if (COMMANDS[PubSubType.CHANNELS].message.equals(reply[0])) {
|
|
this.#emitPubSubMessage(
|
|
PubSubType.CHANNELS,
|
|
reply[2],
|
|
reply[1]
|
|
);
|
|
return true;
|
|
} else if (COMMANDS[PubSubType.PATTERNS].message.equals(reply[0])) {
|
|
this.#emitPubSubMessage(
|
|
PubSubType.PATTERNS,
|
|
reply[3],
|
|
reply[2],
|
|
reply[1]
|
|
);
|
|
return true;
|
|
} else if (COMMANDS[PubSubType.SHARDED].message.equals(reply[0])) {
|
|
this.#emitPubSubMessage(
|
|
PubSubType.SHARDED,
|
|
reply[2],
|
|
reply[1]
|
|
);
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
removeShardedListeners(channel: string): ChannelListeners {
|
|
const listeners = this.#listeners[PubSubType.SHARDED].get(channel)!;
|
|
this.#listeners[PubSubType.SHARDED].delete(channel);
|
|
this.#updateIsActive();
|
|
return listeners;
|
|
}
|
|
|
|
#emitPubSubMessage(
|
|
type: PubSubType,
|
|
message: Buffer,
|
|
channel: Buffer,
|
|
pattern?: Buffer
|
|
): void {
|
|
const keyString = (pattern ?? channel).toString(),
|
|
listeners = this.#listeners[type].get(keyString);
|
|
|
|
if (!listeners) return;
|
|
|
|
for (const listener of listeners.buffers) {
|
|
listener(message, channel);
|
|
}
|
|
|
|
if (!listeners.strings.size) return;
|
|
|
|
const channelString = pattern ? channel.toString() : keyString,
|
|
messageString = channelString === '__redis__:invalidate' ?
|
|
// https://github.com/redis/redis/pull/7469
|
|
// https://github.com/redis/redis/issues/7463
|
|
(message === null ? null : (message as any as Array<Buffer>).map(x => x.toString())) as any :
|
|
message.toString();
|
|
for (const listener of listeners.strings) {
|
|
listener(messageString, channelString);
|
|
}
|
|
}
|
|
|
|
getTypeListeners(type: PubSubType): PubSubTypeListeners {
|
|
return this.#listeners[type];
|
|
}
|
|
}
|