1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-17 19:41:06 +03:00

Add support for lua scripts in client & muilti, fix client socket initiator, implement simple cluster nodes discovery strategy

This commit is contained in:
leibale
2021-05-31 10:39:40 -04:00
parent 15cf27423a
commit 4cbcc90bbb
11 changed files with 416 additions and 76 deletions

View File

@@ -1,14 +1,16 @@
import RedisSocket, { RedisSocketOptions } from './socket';
import RedisCommandsQueue, { QueueCommandOptions } from './commands-queue';
import COMMANDS from './commands/client';
import { RedisCommand, RedisModules, RedisModule, RedisReply } from './commands';
import { RedisCommand, RedisModules, RedisReply } from './commands';
import RedisMultiCommand, { MultiQueuedCommand, RedisMultiCommandType } from './multi-command';
import EventEmitter from 'events';
import { CommandOptions, commandOptions, isCommandOptions } from './command-options';
import { RedisLuaScript, RedisLuaScripts } from './lua-script';
export interface RedisClientOptions<M = RedisModules> {
export interface RedisClientOptions<M = RedisModules, S = RedisLuaScripts> {
socket?: RedisSocketOptions;
modules?: M;
scripts?: S;
commandsQueueMaxLength?: number;
readOnly?: boolean;
callbackify?: boolean;
@@ -21,17 +23,21 @@ type WithCommands = {
[P in keyof typeof COMMANDS]: RedisCommandSignature<(typeof COMMANDS)[P]>;
};
type WithModules<M extends Array<RedisModule>> = {
type WithModules<M extends RedisModules> = {
[P in keyof M[number]]: RedisCommandSignature<M[number][P]>;
};
export type RedisClientType<M extends RedisModules> = WithCommands & WithModules<M> & RedisClient<M>;
type WithScripts<S extends RedisLuaScripts> = {
[P in keyof S]: RedisCommandSignature<S[P]>;
};
export type RedisClientType<M extends RedisModules, S extends RedisLuaScripts> = WithCommands & WithModules<M> & WithScripts<S> & RedisClient<M, S>;
export interface ClientCommandOptions extends QueueCommandOptions {
duplicateConnection?: boolean;
}
export default class RedisClient<M extends RedisModules = RedisModules> extends EventEmitter {
export default class RedisClient<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> extends EventEmitter {
static defineCommand(on: any, name: string, command: RedisCommand): void {
on[name] = async function (...args: Array<unknown>): Promise<unknown> {
const options = isCommandOptions(args[0]) && args.shift();
@@ -62,31 +68,36 @@ export default class RedisClient<M extends RedisModules = RedisModules> extends
};
}
static create<M extends RedisModules>(options?: RedisClientOptions<M>): RedisClientType<M> {
return <any>new RedisClient<M>(options);
static create<M extends RedisModules, S extends RedisLuaScripts>(options?: RedisClientOptions<M, S>): RedisClientType<M, S> {
return <any>new RedisClient<M, S>(options);
}
static commandOptions(options: ClientCommandOptions): CommandOptions<ClientCommandOptions> {
return commandOptions(options);
};
readonly #options?: RedisClientOptions<M>;
readonly #options?: RedisClientOptions<M, S>;
readonly #socket: RedisSocket;
readonly #queue: RedisCommandsQueue;
readonly #Multi: typeof RedisMultiCommand & { new(): RedisMultiCommandType<M> };
readonly #Multi: typeof RedisMultiCommand & { new(): RedisMultiCommandType<M, S> };
#selectedDB = 0;
get options(): RedisClientOptions<M> | null | undefined {
return this.#options;
}
get isOpen(): boolean {
return this.#socket.isOpen;
}
constructor(options?: RedisClientOptions<M>) {
constructor(options?: RedisClientOptions<M, S>) {
super();
this.#options = options;
this.#socket = this.#initiateSocket();
this.#queue = this.#initiateQueue();
this.#Multi = this.#initiateMulti();
this.#initiateModules();
this.#initiateScripts();
this.#callbackify();
}
@@ -95,15 +106,15 @@ export default class RedisClient<M extends RedisModules = RedisModules> extends
const promises = [];
if (this.#options?.socket?.password) {
promises.push((this as any).auth(this.#options?.socket));
promises.push((this as any).auth(RedisClient.commandOptions({ asap: true }), this.#options?.socket));
}
if (this.#options?.readOnly) {
promises.push((this as any).readOnly());
promises.push((this as any).readOnly(RedisClient.commandOptions({ asap: true })));
}
if (this.#selectedDB !== 0) {
promises.push((this as any).select(this.#selectedDB));
promises.push((this as any).select(RedisClient.commandOptions({ asap: true }), this.#selectedDB));
}
await Promise.all(promises);
@@ -131,7 +142,7 @@ export default class RedisClient<M extends RedisModules = RedisModules> extends
);
}
#initiateMulti(): typeof RedisMultiCommand & { new(): RedisMultiCommandType<M> } {
#initiateMulti(): typeof RedisMultiCommand & { new(): RedisMultiCommandType<M, S> } {
const executor = async (commands: Array<MultiQueuedCommand>): Promise<Array<RedisReply>> => {
const promise = Promise.all(
commands.map(({encodedCommand}) => {
@@ -145,11 +156,10 @@ export default class RedisClient<M extends RedisModules = RedisModules> extends
return (replies[replies.length - 1] as Array<RedisReply>);
};
const modules = this.#options?.modules;
const options = this.#options;
return <any>class extends RedisMultiCommand {
constructor() {
super(executor, modules);
super(executor, options?.modules, options?.scripts);
}
};
}
@@ -165,6 +175,34 @@ export default class RedisClient<M extends RedisModules = RedisModules> extends
}
}
#initiateScripts(): void {
if (!this.#options?.scripts) return;
for (const [name, script] of Object.entries(this.#options.scripts)) {
(this as any)[name] = async function (...args: Parameters<typeof script.transformArguments>): Promise<ReturnType<typeof script.transformReply>> {
const options = isCommandOptions(args[0]) && args[0];
return script.transformReply(
await this.#executeScript(script, [
script.NUMBER_OF_KEYS.toString(),
...script.transformArguments(options ? args.slice(1) : args)
])
);
};
}
}
async #executeScript<S extends RedisLuaScript>(script: S, args: Array<string>): Promise<ReturnType<S['transformReply']>> {
try {
return await this.sendCommand(['EVALSHA', script.SHA, ...args]);
} catch (err: any) {
if (!err?.message?.startsWith?.('NOSCRIPT')) {
throw err;
}
return await this.sendCommand(['EVAL', script.SCRIPT, ...args]);
}
}
#callbackify(): void {
if (!this.#options?.callbackify) return;
@@ -183,7 +221,7 @@ export default class RedisClient<M extends RedisModules = RedisModules> extends
}
}
duplicate(): RedisClientType<M> {
duplicate(): RedisClientType<M, S> {
return RedisClient.create(this.#options);
}
@@ -191,8 +229,15 @@ export default class RedisClient<M extends RedisModules = RedisModules> extends
await this.#socket.connect();
}
async SELECT(db: number): Promise<void> {
await this.sendCommand(['SELECT', db.toString()]);
async SELECT(db: number): Promise<void>;
async SELECT(options: CommandOptions<ClientCommandOptions>, db: number): Promise<void>;
async SELECT(options?: any, db?: any): Promise<void> {
if (!isCommandOptions(options)) {
db = options;
options = null;
}
await this.sendCommand(['SELECT', db.toString()], options);
this.#selectedDB = db;
}
@@ -218,7 +263,7 @@ export default class RedisClient<M extends RedisModules = RedisModules> extends
return await promise;
}
multi(): RedisMultiCommandType<M> {
multi(): RedisMultiCommandType<M, S> {
return new this.#Multi();
}