1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00

cluster close & destroy

This commit is contained in:
Leibale
2023-05-15 16:56:48 +03:00
parent e3326134ba
commit e27693f13f
2 changed files with 70 additions and 30 deletions

View File

@@ -259,7 +259,7 @@ export default class RedisClusterSlots<
// switch to `CLUSTER SHARDS` when Redis 7.0 will be the minimum supported version // switch to `CLUSTER SHARDS` when Redis 7.0 will be the minimum supported version
return await client.clusterSlots(); return await client.clusterSlots();
} finally { } finally {
await client.disconnect(); client.destroy();
} }
} }
@@ -377,40 +377,66 @@ export default class RedisClusterSlots<
return this._discoverWithRootNodes(); return this._discoverWithRootNodes();
} }
/**
* @deprecated Use `close` instead.
*/
quit(): Promise<void> { quit(): Promise<void> {
return this._destroy(client => client.quit()); return this._destroy(client => client.quit());
} }
/**
* @deprecated Use `destroy` instead.
*/
disconnect(): Promise<void> { disconnect(): Promise<void> {
return this._destroy(client => client.disconnect()); return this._destroy(client => client.disconnect());
} }
close() {
return this._destroy(client => client.close());
}
destroy() {
this._isOpen = false;
for (const client of this._clients()) {
this._execOnNodeClient(client, client => client.destroy());
}
if (this.pubSubNode) {
this._execOnNodeClient(this.pubSubNode.client, client => client.destroy());
this.pubSubNode = undefined;
}
this._resetSlots();
this.nodeByAddress.clear();
}
private *_clients() {
for (const { master, replicas } of this.shards) {
if (master.client) {
yield master.client;
}
if (master.pubSubClient) {
yield master.pubSubClient;
}
if (replicas) {
for (const { client } of replicas) {
if (client) {
yield client;
}
}
}
}
}
private async _destroy(fn: (client: RedisClientType<M, F, S, RESP>) => Promise<unknown>): Promise<void> { private async _destroy(fn: (client: RedisClientType<M, F, S, RESP>) => Promise<unknown>): Promise<void> {
this._isOpen = false; this._isOpen = false;
const promises = []; const promises = [];
for (const { master, replicas } of this.shards) { for (const client of this._clients()) {
if (master.client) { promises.push(this._execOnNodeClient(client, fn));
promises.push(
this._execOnNodeClient(master.client, fn)
);
}
if (master.pubSubClient) {
promises.push(
this._execOnNodeClient(master.pubSubClient, fn)
);
}
if (replicas) {
for (const { client } of replicas) {
if (client) {
promises.push(
this._execOnNodeClient(client, fn)
);
}
}
}
} }
if (this.pubSubNode) { if (this.pubSubNode) {
@@ -424,10 +450,10 @@ export default class RedisClusterSlots<
await Promise.allSettled(promises); await Promise.allSettled(promises);
} }
private _execOnNodeClient( private _execOnNodeClient<T>(
client: ClientOrPromise<M, F, S, RESP>, client: ClientOrPromise<M, F, S, RESP>,
fn: (client: RedisClientType<M, F, S, RESP>) => Promise<unknown> fn: (client: RedisClientType<M, F, S, RESP>) => T
) { ): T | Promise<T> {
return types.isPromise(client) ? return types.isPromise(client) ?
client.then(fn) : client.then(fn) :
fn(client); fn(client);

View File

@@ -503,14 +503,28 @@ export default class RedisCluster<
sUnsubscribe = this.SUNSUBSCRIBE; sUnsubscribe = this.SUNSUBSCRIBE;
// quit(): Promise<void> { /**
// return this.#slots.quit(); * @deprecated Use `close` instead.
// } */
quit() {
return this._slots.quit();
}
disconnect(): Promise<void> { /**
* @deprecated Use `destroy` instead.
*/
disconnect() {
return this._slots.disconnect(); return this._slots.disconnect();
} }
close() {
return this._slots.close();
}
destroy() {
return this._slots.destroy();
}
nodeClient(node: ShardNode<M, F, S, RESP>) { nodeClient(node: ShardNode<M, F, S, RESP>) {
return this._slots.nodeClient(node); return this._slots.nodeClient(node);
} }