You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-17 19:41:06 +03:00
enhance the way commanders (client/multi/cluster) get extended with modules and scripts
This commit is contained in:
232
lib/client.ts
232
lib/client.ts
@@ -9,6 +9,7 @@ import { RedisLuaScript, RedisLuaScripts } from './lua-script';
|
||||
import { ScanOptions, ZMember } from './commands/generic-transformers';
|
||||
import { ScanCommandOptions } from './commands/SCAN';
|
||||
import { HScanTuple } from './commands/HSCAN';
|
||||
import { extendWithDefaultCommands, extendWithModulesAndScripts, transformCommandArguments } from './commander';
|
||||
|
||||
export interface RedisClientOptions<M = RedisModules, S = RedisLuaScripts> {
|
||||
socket?: RedisSocketOptions;
|
||||
@@ -47,18 +48,54 @@ export interface ClientCommandOptions extends QueueCommandOptions {
|
||||
}
|
||||
|
||||
export default class RedisClient<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> extends EventEmitter {
|
||||
static create<M extends RedisModules, S extends RedisLuaScripts>(options?: RedisClientOptions<M, S>): RedisClientType<M, S> {
|
||||
return <any>new RedisClient<M, S>(options);
|
||||
}
|
||||
|
||||
static commandOptions(options: ClientCommandOptions): CommandOptions<ClientCommandOptions> {
|
||||
return commandOptions(options);
|
||||
}
|
||||
|
||||
static async commandsExecutor(
|
||||
this: RedisClient,
|
||||
command: RedisCommand,
|
||||
args: Array<unknown>
|
||||
): Promise<ReturnType<typeof command['transformReply']>> {
|
||||
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(command, args);
|
||||
|
||||
const reply = command.transformReply(
|
||||
await this.#sendCommand(redisArgs, options),
|
||||
redisArgs.preserve
|
||||
);
|
||||
|
||||
return reply;
|
||||
}
|
||||
|
||||
static async #scriptsExecutor(
|
||||
this: RedisClient,
|
||||
script: RedisLuaScript,
|
||||
args: Array<unknown>
|
||||
): Promise<typeof script['transformArguments']> {
|
||||
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(script, args);
|
||||
|
||||
const reply = script.transformReply(
|
||||
await this.executeScript(script, redisArgs, options),
|
||||
redisArgs.preserve
|
||||
);
|
||||
|
||||
return reply;
|
||||
}
|
||||
|
||||
static create<M extends RedisModules, S extends RedisLuaScripts>(options?: RedisClientOptions<M, S>): RedisClientType<M, S> {
|
||||
return new (<any>extendWithModulesAndScripts({
|
||||
BaseClass: RedisClient,
|
||||
modules: options?.modules,
|
||||
modulesCommandsExecutor: RedisClient.commandsExecutor,
|
||||
scripts: options?.scripts,
|
||||
scriptsExecutor: RedisClient.#scriptsExecutor
|
||||
}))(options);
|
||||
}
|
||||
|
||||
readonly #options?: RedisClientOptions<M, S>;
|
||||
readonly #socket: RedisSocket;
|
||||
readonly #queue: RedisCommandsQueue;
|
||||
readonly #Multi: typeof RedisMultiCommand & { new(): RedisMultiCommandType<M, S> };
|
||||
readonly #Multi: new (...args: ConstructorParameters<typeof RedisMultiCommand>) => RedisMultiCommandType<M, S>;
|
||||
readonly #v4: Record<string, any> = {};
|
||||
#selectedDB = 0;
|
||||
|
||||
@@ -83,9 +120,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
|
||||
this.#options = options;
|
||||
this.#socket = this.#initiateSocket();
|
||||
this.#queue = this.#initiateQueue();
|
||||
this.#Multi = this.#initiateMulti();
|
||||
this.#initiateModules();
|
||||
this.#initiateScripts();
|
||||
this.#Multi = RedisMultiCommand.extend(options);
|
||||
this.#legacyMode();
|
||||
}
|
||||
|
||||
@@ -137,98 +172,14 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
|
||||
);
|
||||
}
|
||||
|
||||
#initiateMulti(): typeof RedisMultiCommand & { new(): RedisMultiCommandType<M, S> } {
|
||||
const executor = async (commands: Array<MultiQueuedCommand>): Promise<Array<RedisReply>> => {
|
||||
const promise = Promise.all(
|
||||
commands.map(({encodedCommand}) => {
|
||||
return this.#queue.addEncodedCommand(encodedCommand);
|
||||
})
|
||||
);
|
||||
|
||||
this.#tick();
|
||||
|
||||
return await promise;
|
||||
};
|
||||
|
||||
const options = this.#options;
|
||||
return <any>class extends RedisMultiCommand {
|
||||
constructor() {
|
||||
super(executor, options);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#initiateModules(): void {
|
||||
if (!this.#options?.modules) return;
|
||||
|
||||
for (const [moduleName, commands] of Object.entries(this.#options.modules)) {
|
||||
const module: {
|
||||
[P in keyof typeof commands]: RedisCommandSignature<(typeof commands)[P]>;
|
||||
} = {};
|
||||
|
||||
for (const [commandName, command] of Object.entries(commands)) {
|
||||
module[commandName] = (...args) => this.executeCommand(command, args);
|
||||
}
|
||||
|
||||
(this as any)[moduleName] = module;
|
||||
}
|
||||
}
|
||||
|
||||
#initiateScripts(): void {
|
||||
if (!this.#options?.scripts) return;
|
||||
|
||||
for (const [name, script] of Object.entries(this.#options.scripts)) {
|
||||
(this as any)[name] = async function (...args: Parameters<typeof script.transformArguments>): Promise<ReturnType<typeof script.transformReply>> {
|
||||
let options;
|
||||
if (isCommandOptions<ClientCommandOptions>(args[0])) {
|
||||
options = args[0];
|
||||
args = args.slice(1);
|
||||
}
|
||||
|
||||
const transformedArguments = script.transformArguments(...args);
|
||||
return script.transformReply(
|
||||
await this.executeScript(
|
||||
script,
|
||||
transformedArguments,
|
||||
options
|
||||
),
|
||||
transformedArguments.preserve
|
||||
);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async executeScript<S extends RedisLuaScript>(script: S, args: Array<string>, options?: ClientCommandOptions): Promise<ReturnType<S['transformReply']>> {
|
||||
try {
|
||||
return await this.#sendCommand([
|
||||
'EVALSHA',
|
||||
script.SHA,
|
||||
script.NUMBER_OF_KEYS.toString(),
|
||||
...args
|
||||
], options);
|
||||
} catch (err: any) {
|
||||
if (!err?.message?.startsWith?.('NOSCRIPT')) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
return await this.#sendCommand([
|
||||
'EVAL',
|
||||
script.SCRIPT,
|
||||
script.NUMBER_OF_KEYS.toString(),
|
||||
...args
|
||||
], options);
|
||||
}
|
||||
}
|
||||
|
||||
#legacyMode(): void {
|
||||
if (!this.#options?.legacyMode) return;
|
||||
|
||||
(this as any).#v4.sendCommand = this.sendCommand.bind(this);
|
||||
(this as any).#v4.sendCommand = this.#sendCommand.bind(this);
|
||||
(this as any).sendCommand = (...args: Array<unknown>): void => {
|
||||
const options = isCommandOptions<ClientCommandOptions>(args[0]) ? args[0] : undefined,
|
||||
callback = typeof args[args.length - 1] === 'function' ? args[args.length - 1] as Function : undefined,
|
||||
actualArgs = !options && !callback ? args : args.slice(options ? 1 : 0, callback ? -1 : Infinity);
|
||||
this.#sendCommand(actualArgs.flat() as Array<string>, options)
|
||||
const callback = typeof args[args.length - 1] === 'function' ? args[args.length - 1] as Function : undefined,
|
||||
actualArgs = !callback ? args : args.slice(0, -1);
|
||||
this.#sendCommand(actualArgs.flat() as Array<string>)
|
||||
.then((reply: unknown) => {
|
||||
if (!callback) return;
|
||||
|
||||
@@ -244,7 +195,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
|
||||
|
||||
callback(err);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
for (const name of Object.keys(COMMANDS)) {
|
||||
this.#defineLegacyCommand(name);
|
||||
@@ -261,39 +212,17 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
|
||||
this.#defineLegacyCommand('unsubscribe');
|
||||
this.#defineLegacyCommand('PUNSUBSCRIBE');
|
||||
this.#defineLegacyCommand('pUnsubscribe');
|
||||
|
||||
if (this.#options?.modules) {
|
||||
for (const [module, commands] of Object.entries(this.#options.modules)) {
|
||||
for (const name of Object.keys(commands)) {
|
||||
this.#v4[module] = {};
|
||||
this.#defineLegacyCommand(name, module);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.#options?.scripts) {
|
||||
for (const name of Object.keys(this.#options.scripts)) {
|
||||
this.#defineLegacyCommand(name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#defineLegacyCommand(name: string, moduleName?: string): void {
|
||||
const handler = (...args: Array<unknown>): void => {
|
||||
#defineLegacyCommand(name: string): void {
|
||||
(this as any).#v4[name] = (this as any)[name].bind(this);
|
||||
(this as any)[name] = (...args: Array<unknown>): void => {
|
||||
(this as any).sendCommand(name, ...args);
|
||||
};
|
||||
|
||||
if (moduleName) {
|
||||
(this as any).#v4[moduleName][name] = (this as any)[moduleName][name];
|
||||
(this as any)[moduleName][name] = handler;
|
||||
} else {
|
||||
(this as any).#v4[name] = (this as any)[name].bind(this);
|
||||
(this as any)[name] = handler;
|
||||
}
|
||||
}
|
||||
|
||||
duplicate(): RedisClientType<M, S> {
|
||||
return RedisClient.create(this.#options);
|
||||
return new (Object.getPrototypeOf(this).constructor)(this.#options);
|
||||
}
|
||||
|
||||
async connect(): Promise<void> {
|
||||
@@ -354,13 +283,14 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
|
||||
return this.#sendCommand(args, options);
|
||||
}
|
||||
|
||||
// using `#sendCommand` cause `sendCommand` is overwritten in legacy mode
|
||||
async #sendCommand<T = unknown>(args: Array<string>, options?: ClientCommandOptions): Promise<T> {
|
||||
if (options?.duplicateConnection) {
|
||||
const duplicate = this.duplicate();
|
||||
await duplicate.connect();
|
||||
|
||||
try {
|
||||
return await duplicate.#sendCommand(args, {
|
||||
return await duplicate.sendCommand(args, {
|
||||
...options,
|
||||
duplicateConnection: false
|
||||
});
|
||||
@@ -374,25 +304,45 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
|
||||
return await promise;
|
||||
}
|
||||
|
||||
async executeCommand(command: RedisCommand, args: Array<unknown>): Promise<unknown> {
|
||||
let options;
|
||||
if (isCommandOptions<ClientCommandOptions>(args[0])) {
|
||||
options = args[0];
|
||||
args = args.slice(1);
|
||||
}
|
||||
async executeScript(script: RedisLuaScript, args: Array<string>, options?: ClientCommandOptions): Promise<ReturnType<typeof script['transformReply']>> {
|
||||
try {
|
||||
return await this.#sendCommand([
|
||||
'EVALSHA',
|
||||
script.SHA,
|
||||
script.NUMBER_OF_KEYS.toString(),
|
||||
...args
|
||||
], options);
|
||||
} catch (err: any) {
|
||||
if (!err?.message?.startsWith?.('NOSCRIPT')) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
const transformedArguments = command.transformArguments(...args);
|
||||
return command.transformReply(
|
||||
await this.#sendCommand(
|
||||
transformedArguments,
|
||||
options
|
||||
),
|
||||
transformedArguments.preserve
|
||||
return await this.#sendCommand([
|
||||
'EVAL',
|
||||
script.SCRIPT,
|
||||
script.NUMBER_OF_KEYS.toString(),
|
||||
...args
|
||||
], options);
|
||||
}
|
||||
}
|
||||
|
||||
async #multiExecutor(commands: Array<MultiQueuedCommand>): Promise<Array<RedisReply>> {
|
||||
const promise = Promise.all(
|
||||
commands.map(({encodedCommand}) => {
|
||||
return this.#queue.addEncodedCommand(encodedCommand);
|
||||
})
|
||||
);
|
||||
|
||||
this.#tick();
|
||||
|
||||
return await promise;
|
||||
}
|
||||
|
||||
multi(): RedisMultiCommandType<M, S> {
|
||||
return new this.#Multi();
|
||||
return new this.#Multi(
|
||||
this.#multiExecutor.bind(this),
|
||||
this.#options
|
||||
);
|
||||
}
|
||||
|
||||
async* scanIterator(options?: ScanCommandOptions): AsyncIterable<string> {
|
||||
@@ -470,8 +420,4 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
|
||||
}
|
||||
}
|
||||
|
||||
for (const [name, command] of Object.entries(COMMANDS)) {
|
||||
(RedisClient.prototype as any)[name] = async function (this: RedisClient, ...args: Array<unknown>): Promise<unknown> {
|
||||
return this.executeCommand(command, args);
|
||||
};
|
||||
}
|
||||
extendWithDefaultCommands(RedisClient, RedisClient.commandsExecutor);
|
||||
|
Reference in New Issue
Block a user