1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00

fix ASK and MOVED errors in multi as well

This commit is contained in:
leibale
2021-12-30 17:12:13 -05:00
parent 29ff6c8a36
commit 8f88eb289b

View File

@@ -1,5 +1,5 @@
import COMMANDS from './commands';
import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands';
import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands';
import { ClientCommandOptions, RedisClientCommandSignature, RedisClientOptions, RedisClientType, WithModules, WithScripts } from '../client';
import RedisClusterSlots, { ClusterNode } from './cluster-slots';
import { extendWithModulesAndScripts, transformCommandArguments, transformCommandReply, extendWithCommands } from '../commander';
@@ -82,27 +82,17 @@ export default class RedisCluster<M extends RedisModules, S extends RedisScripts
);
}
async sendCommand<C extends RedisCommand>(
async sendCommand<T = RedisCommandRawReply>(
firstKey: RedisCommandArgument | undefined,
isReadonly: boolean | undefined,
args: RedisCommandArguments,
options?: ClientCommandOptions,
redirections = 0
): Promise<RedisCommandReply<C>> {
const client = this.#slots.getClient(firstKey, isReadonly);
try {
return await client.sendCommand(args, options);
} catch (err: any) {
const shouldRetry = await this.#handleCommandError(err, client, redirections);
if (shouldRetry === true) {
return this.sendCommand(firstKey, isReadonly, args, options, redirections + 1);
} else if (shouldRetry) {
return shouldRetry.sendCommand(args, options);
}
throw err;
}
options?: ClientCommandOptions
): Promise<T> {
return this.#execute(
firstKey,
isReadonly,
client => client.sendCommand<T>(args, options)
);
}
async scriptsExecutor(script: RedisScript, args: Array<unknown>): Promise<RedisCommandReply<typeof script>> {
@@ -124,61 +114,65 @@ export default class RedisCluster<M extends RedisModules, S extends RedisScripts
script: RedisScript,
originalArgs: Array<unknown>,
redisArgs: RedisCommandArguments,
options?: ClientCommandOptions,
redirections = 0
options?: ClientCommandOptions
): Promise<RedisCommandReply<typeof script>> {
const client = this.#slots.getClient(
return this.#execute(
RedisCluster.extractFirstKey(script, originalArgs, redisArgs),
script.IS_READ_ONLY
script.IS_READ_ONLY,
client => client.executeScript(script, redisArgs, options)
);
}
async #execute<Reply>(
firstKey: RedisCommandArgument | undefined,
isReadonly: boolean | undefined,
executor: (client: RedisClientType<M, S>) => Promise<Reply>
): Promise<Reply> {
const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16;
let client = this.#slots.getClient(firstKey, isReadonly);
for (let i = 0;; i++) {
try {
return await client.executeScript(script, redisArgs, options);
} catch (err: any) {
const shouldRetry = await this.#handleCommandError(err, client, redirections);
if (shouldRetry === true) {
return this.executeScript(script, originalArgs, redisArgs, options, redirections + 1);
} else if (shouldRetry) {
return shouldRetry.executeScript(script, redisArgs, options);
}
throw err;
}
}
async #handleCommandError(err: Error, client: RedisClientType<M, S>, redirections: number): Promise<boolean | RedisClientType<M, S>> {
if (redirections > (this.#options.maxCommandRedirections ?? 16)) {
return await executor(client);
} catch (err) {
if (++i > maxCommandRedirections || !(err instanceof Error)) {
throw err;
}
if (err.message.startsWith('ASK')) {
const url = err.message.substring(err.message.lastIndexOf(' ') + 1);
let node = this.#slots.getNodeByUrl(url);
if (!node) {
await this.#slots.rediscover(client);
node = this.#slots.getNodeByUrl(url);
if (this.#slots.getNodeByUrl(url)?.client === client) {
await client.asking();
continue;
}
if (!node) {
await this.#slots.rediscover(client);
const redirectTo = this.#slots.getNodeByUrl(url);
if (!redirectTo) {
throw new Error(`Cannot find node ${url}`);
}
}
await node.client.asking();
return node.client;
await redirectTo.client.asking();
client = redirectTo.client;
continue;
} else if (err.message.startsWith('MOVED')) {
await this.#slots.rediscover(client);
return true;
client = this.#slots.getClient(firstKey, isReadonly);
continue;
}
throw err;
}
}
}
multi(routing?: RedisCommandArgument): RedisClusterMultiCommandType<M, S> {
return new this.#Multi(
async (commands: Array<RedisMultiQueuedCommand>, firstKey?: RedisCommandArgument, chainId?: symbol) => {
return this.#slots
.getClient(firstKey)
.multiExecutor(commands, chainId);
(commands: Array<RedisMultiQueuedCommand>, firstKey?: RedisCommandArgument, chainId?: symbol) => {
return this.#execute(
firstKey,
false,
client => client.multiExecutor(commands, chainId)
);
},
routing
);