1
0
mirror of https://github.com/redis/node-redis.git synced 2025-12-15 23:55:38 +03:00

feat: add cluster/node events (#1855) (#3083)

* add cluster/node events
* add test for cluster events positive branch
* add cluster events docs section

fixes: #1855

---------

Co-authored-by: Nikolay Karadzhov <nkaradzhov89@gmail.com>
This commit is contained in:
Trofymenko Vladyslav
2025-10-09 16:06:57 +03:00
committed by GitHub
parent d6d8d8e8ed
commit bd11e382d0
4 changed files with 86 additions and 12 deletions

View File

@@ -80,9 +80,9 @@ type PubSubNode<
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = (
Omit<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'> &
Required<Pick<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'>>
);
Omit<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'> &
Required<Pick<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'>>
);
type PubSubToResubscribe = Record<
PUBSUB_TYPE['CHANNELS'] | PUBSUB_TYPE['PATTERNS'],
@@ -153,6 +153,7 @@ export default class RedisClusterSlots<
this.#isOpen = true;
try {
await this.#discoverWithRootNodes();
this.#emit('connect');
} catch (err) {
this.#isOpen = false;
throw err;
@@ -333,17 +334,26 @@ export default class RedisClusterSlots<
}
#createClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly = node.readonly) {
const socket =
this.#getNodeAddress(node.address) ??
{ host: node.host, port: node.port, };
const client = Object.freeze({
host: socket.host,
port: socket.port,
});
const emit = this.#emit;
return this.#clientFactory(
this.#clientOptionsDefaults({
clientSideCache: this.clientSideCache,
RESP: this.#options.RESP,
socket: this.#getNodeAddress(node.address) ?? {
host: node.host,
port: node.port
},
readonly
})
).on('error', err => console.error(err));
socket,
readonly,
}))
.on('error', error => emit('node-error', error, client))
.on('reconnecting', () => emit('node-reconnecting', client))
.once('ready', () => emit('node-ready', client))
.once('connect', () => emit('node-connect', client))
.once('end', () => emit('node-disconnect', client));
}
#createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {
@@ -406,6 +416,7 @@ export default class RedisClusterSlots<
this.#resetSlots();
this.nodeByAddress.clear();
this.#emit('disconnect');
}
*#clients() {
@@ -443,6 +454,7 @@ export default class RedisClusterSlots<
this.nodeByAddress.clear();
await Promise.allSettled(promises);
this.#emit('disconnect');
}
getClient(
@@ -542,7 +554,7 @@ export default class RedisClusterSlots<
node = index < this.masters.length ?
this.masters[index] :
this.replicas[index - this.masters.length],
client = this.#createClient(node, false);
client = this.#createClient(node, false);
this.pubSubNode = {
address: node.address,