diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 157b607b08..10288158e4 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -1,5 +1,5 @@ import COMMANDS from './commands'; -import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands'; +import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands'; import { ClientCommandOptions, RedisClientCommandSignature, RedisClientOptions, RedisClientType, WithModules, WithScripts } from '../client'; import RedisClusterSlots, { ClusterNode } from './cluster-slots'; import { extendWithModulesAndScripts, transformCommandArguments, transformCommandReply, extendWithCommands } from '../commander'; @@ -82,27 +82,17 @@ export default class RedisCluster( + async sendCommand( firstKey: RedisCommandArgument | undefined, isReadonly: boolean | undefined, args: RedisCommandArguments, - options?: ClientCommandOptions, - redirections = 0 - ): Promise> { - const client = this.#slots.getClient(firstKey, isReadonly); - - try { - return await client.sendCommand(args, options); - } catch (err: any) { - const shouldRetry = await this.#handleCommandError(err, client, redirections); - if (shouldRetry === true) { - return this.sendCommand(firstKey, isReadonly, args, options, redirections + 1); - } else if (shouldRetry) { - return shouldRetry.sendCommand(args, options); - } - - throw err; - } + options?: ClientCommandOptions + ): Promise { + return this.#execute( + firstKey, + isReadonly, + client => client.sendCommand(args, options) + ); } async scriptsExecutor(script: RedisScript, args: Array): Promise> { @@ -124,61 +114,65 @@ export default class RedisCluster, redisArgs: RedisCommandArguments, - options?: ClientCommandOptions, - redirections = 0 + options?: ClientCommandOptions ): Promise> { - const client = this.#slots.getClient( + return this.#execute( RedisCluster.extractFirstKey(script, originalArgs, redisArgs), - script.IS_READ_ONLY + script.IS_READ_ONLY, + client => client.executeScript(script, redisArgs, options) ); - - try { - return await client.executeScript(script, redisArgs, options); - } catch (err: any) { - const shouldRetry = await this.#handleCommandError(err, client, redirections); - if (shouldRetry === true) { - return this.executeScript(script, originalArgs, redisArgs, options, redirections + 1); - } else if (shouldRetry) { - return shouldRetry.executeScript(script, redisArgs, options); - } - - throw err; - } } - async #handleCommandError(err: Error, client: RedisClientType, redirections: number): Promise> { - if (redirections > (this.#options.maxCommandRedirections ?? 16)) { - throw err; - } - - if (err.message.startsWith('ASK')) { - const url = err.message.substring(err.message.lastIndexOf(' ') + 1); - let node = this.#slots.getNodeByUrl(url); - if (!node) { - await this.#slots.rediscover(client); - node = this.#slots.getNodeByUrl(url); - - if (!node) { - throw new Error(`Cannot find node ${url}`); + async #execute( + firstKey: RedisCommandArgument | undefined, + isReadonly: boolean | undefined, + executor: (client: RedisClientType) => Promise + ): Promise { + const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; + let client = this.#slots.getClient(firstKey, isReadonly); + for (let i = 0;; i++) { + try { + return await executor(client); + } catch (err) { + if (++i > maxCommandRedirections || !(err instanceof Error)) { + throw err; } + + if (err.message.startsWith('ASK')) { + const url = err.message.substring(err.message.lastIndexOf(' ') + 1); + if (this.#slots.getNodeByUrl(url)?.client === client) { + await client.asking(); + continue; + } + + await this.#slots.rediscover(client); + const redirectTo = this.#slots.getNodeByUrl(url); + if (!redirectTo) { + throw new Error(`Cannot find node ${url}`); + } + + await redirectTo.client.asking(); + client = redirectTo.client; + continue; + } else if (err.message.startsWith('MOVED')) { + await this.#slots.rediscover(client); + client = this.#slots.getClient(firstKey, isReadonly); + continue; + } + + throw err; } - - await node.client.asking(); - return node.client; - } else if (err.message.startsWith('MOVED')) { - await this.#slots.rediscover(client); - return true; } - - throw err; } multi(routing?: RedisCommandArgument): RedisClusterMultiCommandType { return new this.#Multi( - async (commands: Array, firstKey?: RedisCommandArgument, chainId?: symbol) => { - return this.#slots - .getClient(firstKey) - .multiExecutor(commands, chainId); + (commands: Array, firstKey?: RedisCommandArgument, chainId?: symbol) => { + return this.#execute( + firstKey, + false, + client => client.multiExecutor(commands, chainId) + ); }, routing );