You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-09 00:22:08 +03:00
fix client.quit, add error events on cluster, fix some "deepsource.io" warnings
This commit is contained in:
@@ -17,54 +17,40 @@ interface SlotNodes<M extends RedisModules, S extends RedisLuaScripts> {
|
||||
clientIterator: IterableIterator<RedisClientType<M, S>> | undefined;
|
||||
}
|
||||
|
||||
type OnError = (err: unknown) => void;
|
||||
|
||||
export default class RedisClusterSlots<M extends RedisModules, S extends RedisLuaScripts> {
|
||||
readonly #options: RedisClusterOptions;
|
||||
readonly #onError: OnError;
|
||||
readonly #nodeByUrl = new Map<string, ClusterNode<M, S>>();
|
||||
readonly #slots: Array<SlotNodes<M, S>> = [];
|
||||
|
||||
constructor(options: RedisClusterOptions) {
|
||||
constructor(options: RedisClusterOptions, onError: OnError) {
|
||||
this.#options = options;
|
||||
this.#onError = onError;
|
||||
}
|
||||
|
||||
async connect(): Promise<void> {
|
||||
for (const rootNode of this.#options.rootNodes) {
|
||||
try {
|
||||
await this.#discoverNodes(rootNode);
|
||||
return;
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
// this.emit('error', err);
|
||||
}
|
||||
if (await this.#discoverNodes(rootNode)) return;
|
||||
}
|
||||
|
||||
throw new Error('None of the root nodes is available');
|
||||
}
|
||||
|
||||
async discover(startWith: RedisClientType<M, S>): Promise<void> {
|
||||
try {
|
||||
await this.#discoverNodes(startWith.options?.socket);
|
||||
return;
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
// this.emit('error', err);
|
||||
}
|
||||
if (await this.#discoverNodes(startWith.options?.socket)) return;
|
||||
|
||||
for (const { client } of this.#nodeByUrl.values()) {
|
||||
if (client === startWith) continue;
|
||||
|
||||
try {
|
||||
await this.#discoverNodes(client.options?.socket);
|
||||
return;
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
// this.emit('error', err);
|
||||
}
|
||||
|
||||
if (await this.#discoverNodes(client.options?.socket)) return;
|
||||
}
|
||||
|
||||
throw new Error('None of the cluster nodes is available');
|
||||
}
|
||||
|
||||
async #discoverNodes(socketOptions?: RedisSocketOptions): Promise<void> {
|
||||
async #discoverNodes(socketOptions?: RedisSocketOptions): Promise<boolean> {
|
||||
const client = RedisClient.create({
|
||||
socket: socketOptions
|
||||
});
|
||||
@@ -73,8 +59,14 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
|
||||
|
||||
try {
|
||||
await this.#reset(await client.clusterNodes());
|
||||
return true;
|
||||
} catch (err) {
|
||||
this.#onError(err);
|
||||
return false;
|
||||
} finally {
|
||||
await client.disconnect(); // TODO: catch error from disconnect?
|
||||
if (client.isOpen) {
|
||||
await client.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,7 +94,6 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
|
||||
for (const [url, { client }] of this.#nodeByUrl.entries()) {
|
||||
if (clientsInUse.has(url)) continue;
|
||||
|
||||
// TODO: ignore error from `.disconnect`?
|
||||
promises.push(client.disconnect());
|
||||
this.#nodeByUrl.delete(url);
|
||||
}
|
||||
|
Reference in New Issue
Block a user