1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00

cluster multi

This commit is contained in:
Leibale
2023-05-30 19:23:14 +03:00
parent 2fa33dbbac
commit fcb3a011b5
6 changed files with 352 additions and 230 deletions

View File

@@ -1,11 +1,11 @@
import { ClientCommandOptions, RedisClientOptions, RedisClientType } from '../client';
import { Command, CommandArguments, CommanderConfig, CommandPolicies, CommandSignature, CommandWithPoliciesSignature, Flags, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, TransformReply } from '../RESP/types';
import { ClientCommandOptions, RedisClientOptions } from '../client';
import { Command, CommandArguments, CommanderConfig, CommandPolicies, CommandWithPoliciesSignature, Flags, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions } from '../RESP/types';
import COMMANDS from '../commands';
import { EventEmitter } from 'events';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots';
// import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command';
// import { RedisMultiQueuedCommand } from '../multi-command';
import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command';
import { RedisMultiQueuedCommand } from '../multi-command';
import { PubSubListener } from '../client/pub-sub';
import { ErrorReply } from '../errors';
@@ -85,7 +85,7 @@ export default class RedisCluster<
FLAGS extends Flags,
POLICIES extends CommandPolicies
> extends EventEmitter {
private static _extractFirstKey<C extends Command>(
static extractFirstKey<C extends Command>(
command: C,
args: Parameters<C['transformArguments']>,
redisArgs: Array<RedisArgument>
@@ -101,46 +101,46 @@ export default class RedisCluster<
private static _createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: ProxyCluster) {
const args = command.transformArguments.apply(undefined, arguments as any),
firstKey = RedisCluster._extractFirstKey(
return async function (this: ProxyCluster, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
firstKey = RedisCluster.extractFirstKey(
command,
arguments as any,
args
args,
redisArgs
),
reply = await this.sendCommand(
firstKey,
command.IS_READ_ONLY,
args,
redisArgs,
this.commandOptions,
command.POLICIES
);
return transformReply ?
transformReply(reply, args.preserve) :
transformReply(reply, redisArgs.preserve) :
reply;
};
}
private static _createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: NamespaceProxyCluster) {
const args = command.transformArguments.apply(undefined, arguments as any),
firstKey = RedisCluster._extractFirstKey(
return async function (this: NamespaceProxyCluster, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
firstKey = RedisCluster.extractFirstKey(
command,
arguments as any,
args
args,
redisArgs
),
reply = await this.self.sendCommand(
firstKey,
command.IS_READ_ONLY,
args,
redisArgs,
this.self.commandOptions,
command.POLICIES
);
return transformReply ?
transformReply(reply, args.preserve) :
transformReply(reply, redisArgs.preserve) :
reply;
};
}
@@ -148,18 +148,18 @@ export default class RedisCluster<
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp);
return async function (this: NamespaceProxyCluster) {
const fnArgs = fn.transformArguments.apply(undefined, arguments as any),
args = prefix.concat(fnArgs),
firstKey = RedisCluster._extractFirstKey(
return async function (this: NamespaceProxyCluster, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args),
redisArgs = prefix.concat(fnArgs),
firstKey = RedisCluster.extractFirstKey(
fn,
arguments as any,
args
fnArgs,
redisArgs
),
reply = await this.self.sendCommand(
firstKey,
fn.IS_READ_ONLY,
args,
redisArgs,
this.self.commandOptions,
fn.POLICIES
);
@@ -173,18 +173,18 @@ export default class RedisCluster<
private static _createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp);
return async function (this: ProxyCluster) {
const scriptArgs = script.transformArguments.apply(undefined, arguments as any),
args = prefix.concat(scriptArgs),
firstKey = RedisCluster._extractFirstKey(
return async function (this: ProxyCluster, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
firstKey = RedisCluster.extractFirstKey(
script,
arguments as any,
args
scriptArgs,
redisArgs
),
reply = await this.sendCommand(
firstKey,
script.IS_READ_ONLY,
args,
redisArgs,
this.commandOptions,
script.POLICIES
);
@@ -211,7 +211,7 @@ export default class RedisCluster<
config
});
// Client.prototype.Multi = RedisClientMultiCommand.extend(config);
Cluster.prototype.Multi = RedisClusterMultiCommand.extend(config);
return (options?: Omit<RedisClusterOptions, keyof Exclude<typeof config, undefined>>) => {
// returning a proxy of the client to prevent the namespaces.self to leak between proxies
@@ -280,8 +280,6 @@ export default class RedisCluster<
return this._slots.pubSubNode;
}
// readonly #Multi: InstantiableRedisClusterMultiCommandType<M, F, S>;
get isOpen() {
return this._slots.isOpen;
}
@@ -291,7 +289,6 @@ export default class RedisCluster<
this._options = options;
this._slots = new RedisClusterSlots(options, this.emit.bind(this));
// this.#Multi = RedisClusterMultiCommand.extend(options);
}
duplicate(overrides?: Partial<RedisClusterOptions<M, F, S>>): RedisClusterType<M, F, S> {
@@ -400,20 +397,38 @@ export default class RedisCluster<
}
}
// MULTI(routing?: RedisCommandArgument): RedisClusterMultiCommandType<M, F, S> {
// return new this.#Multi(
// (commands: Array<RedisMultiQueuedCommand>, firstKey?: RedisCommandArgument, chainId?: symbol) => {
// return this.#execute(
// firstKey,
// false,
// client => client.multiExecutor(commands, undefined, chainId)
// );
// },
// routing
// );
// }
/**
* @internal
*/
async executePipeline(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) {
const client = await this._slots.getClient(firstKey, isReadonly);
return client.executePipeline(commands);
}
// multi = this.MULTI;
/**
* @internal
*/
async executeMulti(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) {
const client = await this._slots.getClient(firstKey, isReadonly);
return client.executeMulti(commands);
}
MULTI(routing?: RedisArgument): RedisClusterMultiCommandType<[], M, F, S, RESP, FLAGS> {
return new (this as any).Multi(
this,
routing
);
}
multi = this.MULTI;
async SUBSCRIBE<T extends boolean = false>(
channels: string | Array<string>,

View File

@@ -1,141 +1,245 @@
// import COMMANDS from './commands';
// import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisFunctions, RedisModules, RedisExtensions, RedisScript, RedisScripts, ExcludeMappedString, RedisFunction } from '../commands';
// import RedisMultiCommand, { RedisMultiQueuedCommand } from '../multi-command';
// import { attachCommands, attachExtensions } from '../commander';
// import RedisCluster from '.';
import COMMANDS from '../commands';
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command';
import { ReplyWithFlags, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, Flags, ReplyUnion, RedisArgument } from '../RESP/types';
import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander';
import RedisCluster, { RedisClusterType } from '.';
// type RedisClusterMultiCommandSignature<
// C extends RedisCommand,
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts
// > = (...args: Parameters<C['transformArguments']>) => RedisClusterMultiCommandType<M, F, S>;
type CommandSignature<
REPLIES extends Array<unknown>,
C extends Command,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
> = (...args: Parameters<C['transformArguments']>) => RedisClusterMultiCommandType<
[...REPLIES, ReplyWithFlags<CommandReply<C, RESP>, FLAGS>],
M,
F,
S,
RESP,
FLAGS
>;
// type WithCommands<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts
// > = {
// [P in keyof typeof COMMANDS]: RedisClusterMultiCommandSignature<(typeof COMMANDS)[P], M, F, S>;
// };
type WithCommands<
REPLIES extends Array<unknown>,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
> = {
[P in keyof typeof COMMANDS]: CommandSignature<REPLIES, (typeof COMMANDS)[P], M, F, S, RESP, FLAGS>;
};
// type WithModules<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts
// > = {
// [P in keyof M as ExcludeMappedString<P>]: {
// [C in keyof M[P] as ExcludeMappedString<C>]: RedisClusterMultiCommandSignature<M[P][C], M, F, S>;
// };
// };
type WithModules<
REPLIES extends Array<unknown>,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
> = {
[P in keyof M]: {
[C in keyof M[P]]: CommandSignature<REPLIES, M[P][C], M, F, S, RESP, FLAGS>;
};
};
// type WithFunctions<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts
// > = {
// [P in keyof F as ExcludeMappedString<P>]: {
// [FF in keyof F[P] as ExcludeMappedString<FF>]: RedisClusterMultiCommandSignature<F[P][FF], M, F, S>;
// };
// };
type WithFunctions<
REPLIES extends Array<unknown>,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
> = {
[L in keyof F]: {
[C in keyof F[L]]: CommandSignature<REPLIES, F[L][C], M, F, S, RESP, FLAGS>;
};
};
// type WithScripts<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts
// > = {
// [P in keyof S as ExcludeMappedString<P>]: RedisClusterMultiCommandSignature<S[P], M, F, S>;
// };
type WithScripts<
REPLIES extends Array<unknown>,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
> = {
[P in keyof S]: CommandSignature<REPLIES, S[P], M, F, S, RESP, FLAGS>;
};
// export type RedisClusterMultiCommandType<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts
// > = RedisClusterMultiCommand & WithCommands<M, F, S> & WithModules<M, F, S> & WithFunctions<M, F, S> & WithScripts<M, F, S>;
export type RedisClusterMultiCommandType<
REPLIES extends Array<any>,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
> = (
RedisClusterMultiCommand<REPLIES> &
WithCommands<REPLIES, M, F, S, RESP, FLAGS> &
WithModules<REPLIES, M, F, S, RESP, FLAGS> &
WithFunctions<REPLIES, M, F, S, RESP, FLAGS> &
WithScripts<REPLIES, M, F, S, RESP, FLAGS>
);
// export type InstantiableRedisClusterMultiCommandType<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts
// > = new (...args: ConstructorParameters<typeof RedisClusterMultiCommand>) => RedisClusterMultiCommandType<M, F, S>;
export default class RedisClusterMultiCommand<REPLIES = []> {
private static _createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return function (this: RedisClusterMultiCommand, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
firstKey = RedisCluster.extractFirstKey(
command,
args,
redisArgs
);
return this.addCommand(
firstKey,
command.IS_READ_ONLY,
redisArgs,
transformReply
);
};
}
// export type RedisClusterMultiExecutor = (queue: Array<RedisMultiQueuedCommand>, firstKey?: RedisCommandArgument, chainId?: symbol) => Promise<Array<RedisCommandRawReply>>;
private static _createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return function (this: { self: RedisClusterMultiCommand }, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
firstKey = RedisCluster.extractFirstKey(
command,
args,
redisArgs
);
return this.self.addCommand(
firstKey,
command.IS_READ_ONLY,
redisArgs,
transformReply
);
};
}
// export default class RedisClusterMultiCommand {
// readonly #multi = new RedisMultiCommand();
// readonly #executor: RedisClusterMultiExecutor;
// #firstKey: RedisCommandArgument | undefined;
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp);
return function (this: { self: RedisClusterMultiCommand }, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args),
redisArgs: CommandArguments = prefix.concat(fnArgs),
firstKey = RedisCluster.extractFirstKey(
fn,
args,
fnArgs
);
redisArgs.preserve = fnArgs.preserve;
return this.self.addCommand(
firstKey,
fn.IS_READ_ONLY,
redisArgs,
transformReply
);
};
}
// static extend<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts
// >(extensions?: RedisExtensions<M, F, S>): InstantiableRedisClusterMultiCommandType<M, F, S> {
// return attachExtensions({
// BaseClass: RedisClusterMultiCommand,
// modulesExecutor: RedisClusterMultiCommand.prototype.commandsExecutor,
// modules: extensions?.modules,
// functionsExecutor: RedisClusterMultiCommand.prototype.functionsExecutor,
// functions: extensions?.functions,
// scriptsExecutor: RedisClusterMultiCommand.prototype.scriptsExecutor,
// scripts: extensions?.scripts
// });
// }
private static _createScriptCommand(script: RedisScript, resp: RespVersions) {
const transformReply = getTransformReply(script, resp);
return function (this: RedisClusterMultiCommand, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args);
this._setState(
RedisCluster.extractFirstKey(
script,
args,
scriptArgs
),
script.IS_READ_ONLY
);
this._multi.addScript(
script,
scriptArgs,
transformReply
);
return this;
};
}
// constructor(executor: RedisClusterMultiExecutor, firstKey?: RedisCommandArgument) {
// this.#executor = executor;
// this.#firstKey = firstKey;
// }
static extend<
M extends RedisModules = Record<string, never>,
F extends RedisFunctions = Record<string, never>,
S extends RedisScripts = Record<string, never>,
RESP extends RespVersions = 2
>(config?: CommanderConfig<M, F, S, RESP>) {
return attachConfig({
BaseClass: RedisClusterMultiCommand,
commands: COMMANDS,
createCommand: RedisClusterMultiCommand._createCommand,
createModuleCommand: RedisClusterMultiCommand._createModuleCommand,
createFunctionCommand: RedisClusterMultiCommand._createFunctionCommand,
createScriptCommand: RedisClusterMultiCommand._createScriptCommand,
config
});
}
// commandsExecutor(command: RedisCommand, args: Array<unknown>): this {
// const transformedArguments = command.transformArguments(...args);
// this.#firstKey ??= RedisCluster.extractFirstKey(command, args, transformedArguments);
// return this.addCommand(undefined, transformedArguments, command.transformReply);
// }
private readonly _multi = new RedisMultiCommand();
private readonly _cluster: RedisClusterType;
private _firstKey: RedisArgument | undefined;
private _isReadonly: boolean | undefined = true;
// addCommand(
// firstKey: RedisCommandArgument | undefined,
// args: RedisCommandArguments,
// transformReply?: RedisCommand['transformReply']
// ): this {
// this.#firstKey ??= firstKey;
// this.#multi.addCommand(args, transformReply);
// return this;
// }
constructor(cluster: RedisClusterType, routing: RedisArgument | undefined) {
this._cluster = cluster;
this._firstKey = routing;
}
// functionsExecutor(fn: RedisFunction, args: Array<unknown>, name: string): this {
// const transformedArguments = this.#multi.addFunction(name, fn, args);
// this.#firstKey ??= RedisCluster.extractFirstKey(fn, args, transformedArguments);
// return this;
// }
private _setState(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
) {
this._firstKey ??= firstKey;
this._isReadonly &&= isReadonly;
}
// scriptsExecutor(script: RedisScript, args: Array<unknown>): this {
// const transformedArguments = this.#multi.addScript(script, args);
// this.#firstKey ??= RedisCluster.extractFirstKey(script, args, transformedArguments);
// return this;
// }
addCommand(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
args: CommandArguments,
transformReply?: TransformReply
) {
this._setState(firstKey, isReadonly);
this._multi.addCommand(args, transformReply);
return this;
}
// async exec(execAsPipeline = false): Promise<Array<RedisCommandRawReply>> {
// if (execAsPipeline) {
// return this.execAsPipeline();
// }
async exec<T extends MultiReply = MULTI_REPLY['GENERIC']>(execAsPipeline = false) {
if (execAsPipeline) return this.execAsPipeline<T>();
// return this.#multi.handleExecReplies(
// await this.#executor(this.#multi.queue, this.#firstKey, RedisMultiCommand.generateChainId())
// );
// }
return this._multi.transformReplies(
await this._cluster.executeMulti(
this._firstKey,
this._isReadonly,
this._multi.queue
)
) as MultiReplyType<T, REPLIES>;
}
// EXEC = this.exec;
EXEC = this.exec;
// async execAsPipeline(): Promise<Array<RedisCommandRawReply>> {
// return this.#multi.transformReplies(
// await this.#executor(this.#multi.queue, this.#firstKey)
// );
// }
// }
execTyped(execAsPipeline = false) {
return this.exec<MULTI_REPLY['TYPED']>(execAsPipeline);
}
// attachCommands({
// BaseClass: RedisClusterMultiCommand,
// commands: COMMANDS,
// executor: RedisClusterMultiCommand.prototype.commandsExecutor
// });
async execAsPipeline<T extends MultiReply = MULTI_REPLY['GENERIC']>() {
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
return this._multi.transformReplies(
await this._cluster.executePipeline(
this._firstKey,
this._isReadonly,
this._multi.queue
)
) as MultiReplyType<T, REPLIES>;
}
execAsPipelineTyped() {
return this.execAsPipeline<MULTI_REPLY['TYPED']>();
}
}