1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-19 07:02:06 +03:00

better modules support, fix some bugs in legacy mode, add some tests

This commit is contained in:
leibale
2021-07-07 18:02:22 -04:00
parent fccb12bd57
commit c5bde41692
6 changed files with 244 additions and 91 deletions

View File

@@ -37,6 +37,18 @@ describe('Client', () => {
describe('legacyMode', () => {
const client = RedisClient.create({
socket: TEST_REDIS_SERVERS[TestRedisServers.OPEN],
modules: {
testModule: {
echo: {
transformArguments(message: string): Array<string> {
return ['ECHO', message];
},
transformReply(reply: string): string {
return reply;
}
}
}
},
legacyMode: true
});
@@ -116,7 +128,7 @@ describe('Client', () => {
});
});
it('client.multi.exec should call the callback', done => {
it('client.multi.ping.exec should call the callback', done => {
(client as any).multi()
.ping()
.exec((err?: Error, reply?: string) => {
@@ -133,19 +145,70 @@ describe('Client', () => {
});
});
it('client.multi.exec should work without callback', async () => {
it('client.multi.ping.exec should work without callback', async () => {
(client as any).multi()
.ping()
.exec();
await client.v4.ping(); // make sure the first command was replied
});
it('client.v4.exec should return a promise', async () => {
it('client.multi.ping.v4.ping.v4.exec should return a promise', async () => {
assert.deepEqual(
await ((client as any).multi().v4
await ((client as any).multi()
.ping()
.exec()),
['PONG']
.v4.ping()
.v4.exec()),
['PONG', 'PONG']
);
});
it('client.testModule.echo should call the callback', done => {
(client as any).testModule.echo('message', (err?: Error, reply?: string) => {
if (err) {
return done(err);
}
try {
assert.deepEqual(reply, 'message');
done();
} catch (err) {
done(err);
}
});
});
it('client.v4.testModule.echo should return a promise', async () => {
assert.equal(
await (client as any).v4.testModule.echo('message'),
'message'
);
});
it('client.multi.testModule.echo.v4.testModule.echo.exec should call the callback', done => {
(client as any).multi()
.testModule.echo('message')
.v4.testModule.echo('message')
.exec((err?: Error, replies?: Array<string>) => {
if (err) {
return done(err);
}
try {
assert.deepEqual(replies, ['message', 'message']);
done();
} catch (err) {
done(err);
}
});
});
it('client.multi.testModule.echo.v4.testModule.echo.v4.exec should return a promise', async () => {
assert.deepEqual(
await ((client as any).multi()
.testModule.echo('message')
.v4.testModule.echo('message')
.v4.exec()),
['message', 'message']
);
});
});

View File

@@ -27,7 +27,9 @@ type WithCommands = {
};
type WithModules<M extends RedisModules> = {
[P in keyof M[number]]: RedisCommandSignature<M[number][P]>;
[P in keyof M]: {
[C in keyof M[P]]: RedisCommandSignature<M[P][C]>;
};
};
type WithScripts<S extends RedisLuaScripts> = {
@@ -45,18 +47,6 @@ export interface ClientCommandOptions extends QueueCommandOptions {
}
export default class RedisClient<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> extends EventEmitter {
static defineCommand(on: any, name: string, command: RedisCommand): void {
on[name] = async function (...args: Array<unknown>): Promise<unknown> {
const options = isCommandOptions(args[0]) && args[0];
return command.transformReply(
await this.sendCommand(
command.transformArguments(...(options ? args.slice(1) : args)),
options
)
);
};
}
static create<M extends RedisModules, S extends RedisLuaScripts>(options?: RedisClientOptions<M, S>): RedisClientType<M, S> {
return <any>new RedisClient<M, S>(options);
}
@@ -69,7 +59,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
readonly #socket: RedisSocket;
readonly #queue: RedisCommandsQueue;
readonly #Multi: typeof RedisMultiCommand & { new(): RedisMultiCommandType<M, S> };
readonly #v4: Record<string, Function> = {};
readonly #v4: Record<string, any> = {};
#selectedDB = 0;
get options(): RedisClientOptions<M> | null | undefined {
@@ -80,7 +70,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
return this.#socket.isOpen;
}
get v4(): Record<string, Function> {
get v4(): Record<string, any> {
if (!this.#options?.legacyMode) {
throw new Error('the client is not in "legacy mode"');
}
@@ -101,18 +91,19 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
#initiateSocket(): RedisSocket {
const socketInitiator = async (): Promise<void> => {
const promises = [];
const v4Commands = this.#options?.legacyMode ? this.#v4 : this,
promises = [];
if (this.#selectedDB !== 0) {
promises.push((this as any).select(RedisClient.commandOptions({ asap: true }), this.#selectedDB));
promises.push(v4Commands.select(RedisClient.commandOptions({ asap: true }), this.#selectedDB));
}
if (this.#options?.readonly) {
promises.push((this as any).readonly(RedisClient.commandOptions({ asap: true })));
promises.push(v4Commands.readonly(RedisClient.commandOptions({ asap: true })));
}
if (this.#options?.socket?.password) {
promises.push((this as any).auth(RedisClient.commandOptions({ asap: true }), this.#options?.socket));
promises.push(v4Commands.auth(RedisClient.commandOptions({ asap: true }), this.#options.socket));
}
const resubscribePromise = this.#queue.resubscribe();
@@ -170,11 +161,16 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
#initiateModules(): void {
if (!this.#options?.modules) return;
for (const m of this.#options.modules) {
for (const [name, command] of Object.entries(m)) {
RedisClient.defineCommand(this, name, command);
this.#Multi.defineCommand(this.#Multi, name, command);
for (const [moduleName, commands] of Object.entries(this.#options.modules)) {
const module: {
[P in keyof typeof commands]: RedisCommandSignature<(typeof commands)[P]>;
} = {};
for (const [commandName, command] of Object.entries(commands)) {
module[commandName] = (...args) => this.executeCommand(command, args);
}
(this as any)[moduleName] = module;
}
}
@@ -196,7 +192,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
async executeScript<S extends RedisLuaScript>(script: S, args: Array<string>, options?: ClientCommandOptions): Promise<ReturnType<S['transformReply']>> {
try {
return await this.sendCommand([
return await this.#sendCommand([
'EVALSHA',
script.SHA,
script.NUMBER_OF_KEYS.toString(),
@@ -207,7 +203,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
throw err;
}
return await this.sendCommand([
return await this.#sendCommand([
'EVAL',
script.SCRIPT,
script.NUMBER_OF_KEYS.toString(),
@@ -219,13 +215,12 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
#legacyMode(): void {
if (!this.#options?.legacyMode) return;
this.#v4.sendCommand = this.sendCommand.bind(this);
(this as any).#v4.sendCommand = this.sendCommand.bind(this);
(this as any).sendCommand = (...args: Array<unknown>): void => {
const options = isCommandOptions(args[0]) && args.shift(),
callback = typeof args[args.length - 1] === 'function' && (args.pop() as Function);
this.#v4.sendCommand(args.flat(), options)
const options = isCommandOptions<ClientCommandOptions>(args[0]) ? args[0] : undefined,
callback = typeof args[args.length - 1] === 'function' ? args[args.length - 1] as Function : undefined,
actualArgs = !options && !callback ? args : args.slice(options ? 1 : 0, callback ? -1 : Infinity);
this.#sendCommand(actualArgs.flat() as Array<string>, options)
.then((reply: unknown) => {
if (!callback) return;
@@ -260,9 +255,10 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
this.#defineLegacyCommand('pUnsubscribe');
if (this.#options?.modules) {
for (const m of this.#options.modules) {
for (const name of Object.keys(m)) {
this.#defineLegacyCommand(name);
for (const [module, commands] of Object.entries(this.#options.modules)) {
for (const name of Object.keys(commands)) {
this.#v4[module] = {};
this.#defineLegacyCommand(name, module);
}
}
}
@@ -274,11 +270,18 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
}
}
#defineLegacyCommand(name: string): void {
this.#v4[name] = (this as any)[name];
(this as any)[name] = function (...args: Array<unknown>): void {
this.sendCommand(name, ...args);
#defineLegacyCommand(name: string, moduleName?: string): void {
const handler = (...args: Array<unknown>): void => {
(this as any).sendCommand(name, ...args);
};
if (moduleName) {
(this as any).#v4[moduleName][name] = (this as any)[moduleName][name];
(this as any)[moduleName][name] = handler;
} else {
(this as any).#v4[name] = (this as any)[name].bind(this);
(this as any)[name] = handler;
}
}
duplicate(): RedisClientType<M, S> {
@@ -297,7 +300,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
options = null;
}
await this.sendCommand(['SELECT', db.toString()], options);
await this.#sendCommand(['SELECT', db.toString()], options);
this.#selectedDB = db;
}
@@ -339,13 +342,17 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
return promise;
}
async sendCommand<T = unknown>(args: Array<string>, options?: ClientCommandOptions): Promise<T> {
sendCommand<T = unknown>(args: Array<string>, options?: ClientCommandOptions): Promise<T> {
return this.#sendCommand(args, options);
}
async #sendCommand<T = unknown>(args: Array<string>, options?: ClientCommandOptions): Promise<T> {
if (options?.duplicateConnection) {
const duplicate = this.duplicate();
await duplicate.connect();
try {
return await duplicate.sendCommand(args, {
return await duplicate.#sendCommand(args, {
...options,
duplicateConnection: false
});
@@ -359,6 +366,21 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
return await promise;
}
async executeCommand(command: RedisCommand, args: Array<unknown>): Promise<unknown> {
let options;
if (isCommandOptions<ClientCommandOptions>(args[0])) {
options = args[0];
args = args.slice(1);
}
return command.transformReply(
await this.#sendCommand(
command.transformArguments(...args),
options
)
);
}
multi(): RedisMultiCommandType<M, S> {
return new this.#Multi();
}
@@ -430,5 +452,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
}
for (const [name, command] of Object.entries(COMMANDS)) {
RedisClient.defineCommand(RedisClient.prototype, name, command);
(RedisClient.prototype as any)[name] = async function (this: RedisClient, ...args: Array<unknown>): Promise<unknown> {
return this.executeCommand(command, args);
};
}

View File

@@ -1,6 +1,6 @@
import COMMANDS from './commands';
import { RedisCommand, RedisModules } from './commands';
import { ClientCommandOptions, RedisClientType, WithPlugins } from './client';
import { ClientCommandOptions, RedisClientType, RedisCommandSignature, WithPlugins } from './client';
import { RedisSocketOptions } from './socket';
import RedisClusterSlots, { ClusterNode } from './cluster-slots';
import { RedisLuaScript, RedisLuaScripts } from './lua-script';
@@ -18,12 +18,12 @@ export type RedisClusterType<M extends RedisModules, S extends RedisLuaScripts>
WithPlugins<M, S> & RedisCluster;
export default class RedisCluster<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> {
static defineCommand(on: any, name: string, command: RedisCommand): void {
static defineCommand(on: any, name: string, command: RedisCommand, cluster?: RedisCluster): void {
on[name] = async function (...args: Array<unknown>): Promise<unknown> {
const options = isCommandOptions(args[0]) && args[0],
redisArgs = command.transformArguments(...(options ? args.slice(1) : args));
return command.transformReply(
await this.sendCommand(
await (cluster ?? this).sendCommand(
RedisCluster.#extractFirstKey(command, args, redisArgs),
command.IS_READ_ONLY,
redisArgs,
@@ -64,10 +64,16 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
#initiateModules(): void {
if (!this.#options.modules) return;
for (const m of this.#options.modules) {
for (const [name, command] of Object.entries(m)) {
RedisCluster.defineCommand(this, name, command);
for (const [moduleName, commands] of Object.entries(this.#options.modules)) {
const module: {
[P in keyof typeof commands]: RedisCommandSignature<(typeof commands)[P]>;
} = {};
for (const [commandName, command] of Object.entries(commands)) {
module[commandName] = (...args) => this.executeCommand(command, args);
}
(this as any)[moduleName] = module;
}
}
@@ -116,6 +122,24 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
}
}
async executeCommand(command: RedisCommand, args: Array<unknown>): Promise<(typeof command)['transformReply']> {
let options;
if (isCommandOptions<ClientCommandOptions>(args[0])) {
options = args[0];
args = args.slice(1);
}
const redisArgs = command.transformArguments(...args);
return command.transformReply(
await this.sendCommand(
RedisCluster.#extractFirstKey(command, args, redisArgs),
command.IS_READ_ONLY,
redisArgs,
options
)
);
}
async executeScript<S extends RedisLuaScript>(
script: S,
originalArgs: Array<unknown>,
@@ -183,5 +207,7 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
}
for (const [name, command] of Object.entries(COMMANDS)) {
RedisCluster.defineCommand(RedisCluster.prototype, name, command);
(RedisCluster.prototype as any)[name] = function (this: RedisCluster, ...args: Array<unknown>) {
return this.executeCommand(command, args);
};
}

View File

@@ -113,7 +113,7 @@ export default class RedisCommandsQueue {
return;
}
}
this.#shiftWaitingForReply().resolve(reply);
},
returnError: (err: Error) => this.#shiftWaitingForReply().reject(err)

View File

@@ -585,4 +585,4 @@ export interface RedisModule {
[key: string]: RedisCommand;
}
export type RedisModules = Array<RedisModule>;
export type RedisModules = Record<string, RedisModule>;

View File

@@ -1,7 +1,7 @@
import COMMANDS from './commands';
import { RedisCommand, RedisModules, RedisReply } from './commands';
import RedisCommandsQueue from './commands-queue';
import { RedisLuaScripts } from './lua-script';
import { RedisLuaScript, RedisLuaScripts } from './lua-script';
import { RedisClientOptions } from './client';
type RedisMultiCommandSignature<C extends RedisCommand, M extends RedisModules, S extends RedisLuaScripts> = (...args: Parameters<C['transformArguments']>) => RedisMultiCommandType<M, S>;
@@ -11,7 +11,9 @@ type WithCommands<M extends RedisModules, S extends RedisLuaScripts> = {
};
type WithModules<M extends RedisModules, S extends RedisLuaScripts> = {
[P in keyof M[number]]: RedisMultiCommandSignature<M[number][P], M, S>
[P in keyof M]: {
[C in keyof M[P]]: RedisMultiCommandSignature<M[P][C], M, S>;
};
};
type WithScripts<M extends RedisModules, S extends RedisLuaScripts> = {
@@ -28,14 +30,6 @@ export interface MultiQueuedCommand {
export type RedisMultiExecutor = (queue: Array<MultiQueuedCommand>, chainId?: symbol) => Promise<Array<RedisReply>>;
export default class RedisMultiCommand<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> {
static defineCommand(on: any, name: string, command: RedisCommand): void {
on[name] = function (...args: Parameters<typeof command.transformArguments>) {
// do not return `this.addCommand` directly cause in legacy mode it's binded to the legacy version
this.addCommand(command.transformArguments(...args), command.transformReply);
return this;
};
}
static create<M extends RedisModules, S extends RedisLuaScripts>(executor: RedisMultiExecutor, clientOptions?: RedisClientOptions<M, S>): RedisMultiCommandType<M, S> {
return <any>new RedisMultiCommand<M, S>(executor, clientOptions);
}
@@ -48,9 +42,9 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
readonly #scriptsInUse = new Set<string>();
readonly #v4: Record<string, Function> = {};
readonly #v4: Record<string, any> = {};
get v4(): Record<string, Function> {
get v4(): Record<string, any> {
if (!this.#clientOptions?.legacyMode) {
throw new Error('client is not in "legacy mode"');
}
@@ -69,10 +63,16 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
#initiateModules(): void {
if (!this.#clientOptions?.modules) return;
for (const m of this.#clientOptions.modules) {
for (const [name, command] of Object.entries(m)) {
RedisMultiCommand.defineCommand(this, name, command);
for (const [moduleName, commands] of Object.entries(this.#clientOptions.modules)) {
const module: {
[P in keyof typeof commands]: RedisMultiCommandSignature<(typeof commands)[P], M, S>
} = {};
for (const [commandName, command] of Object.entries(commands)) {
module[commandName] = (...args) => this.executeCommand(command, args);
}
(this as any)[moduleName] = module;
}
}
@@ -107,16 +107,14 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
}
}
#legacyMode(): Record<string, Function> | undefined {
#legacyMode(): void {
if (!this.#clientOptions?.legacyMode) return;
this.#v4.exec = this.exec.bind(this);
this.#v4.addCommand = this.addCommand.bind(this);
(this as any).exec = function (...args: Array<unknown>): void {
const callback = typeof args[args.length - 1] === 'function' && args.pop() as Function;
(this as any).exec = (callback?: (err: Error | null, replies?: Array<unknown>) => unknown): void => {
this.#v4.exec()
.then((reply: unknown) => {
.then((reply: Array<unknown>) => {
if (!callback) return;
callback(null, reply);
@@ -131,15 +129,15 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
});
};
for (const name of Object.keys(COMMANDS)) {
this.#defineLegacyCommand(name);
}
if (this.#clientOptions.modules) {
for (const m of this.#clientOptions.modules) {
for (const name of Object.keys(m)) {
this.#defineLegacyCommand(name);
if (this.#clientOptions?.modules) {
for (const [module, commands] of Object.entries(this.#clientOptions.modules)) {
for (const name of Object.keys(commands)) {
this.#v4[module] = {};
this.#defineLegacyCommand(name, module);
}
}
}
@@ -151,22 +149,62 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
}
}
#defineLegacyCommand(name: string): void {
this.#v4[name] = (this as any)[name];
// TODO: https://github.com/NodeRedis/node-redis#commands:~:text=minimal%20parsing
(this as any)[name] = function (...args: Array<unknown>) {
return this.addCommand([name, ...args.flat()]);
#defineLegacyCommand(name: string, moduleName?: string): void {
const handler = (...args: Array<unknown>): RedisMultiCommandType<M, S> => {
return this.addCommand([
name,
...args.flat() as Array<string>
]);
};
if (moduleName) {
this.#v4[moduleName][name] = (this as any)[moduleName][name];
(this as any)[moduleName][name] = handler;
} else {
this.#v4[name] = (this as any)[name].bind(this);
(this as any)[name] = handler;
}
}
addCommand(args: Array<string>, transformReply?: RedisCommand['transformReply']): this {
addCommand(args: Array<string>, transformReply?: RedisCommand['transformReply']): RedisMultiCommandType<M, S> {
this.#queue.push({
encodedCommand: RedisCommandsQueue.encodeCommand(args),
transformReply
});
return this;
return <any>this;
}
executeCommand(command: RedisCommand, args: Array<unknown>): RedisMultiCommandType<M, S> {
return this.addCommand(
command.transformArguments(...args),
command.transformReply
);
}
executeScript(name: string, script: RedisLuaScript, args: Array<unknown>): RedisMultiCommandType<M, S> {
const evalArgs = [];
if (this.#scriptsInUse.has(name)) {
evalArgs.push(
'EVALSHA',
script.SHA
);
} else {
this.#scriptsInUse.add(name);
evalArgs.push(
'EVAL',
script.SCRIPT
);
}
return this.addCommand(
[
...evalArgs,
script.NUMBER_OF_KEYS.toString(),
...script.transformArguments(...args)
],
script.transformReply
);
}
async exec(execAsPipeline = false): Promise<Array<unknown>> {
@@ -201,5 +239,7 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
}
for (const [name, command] of Object.entries(COMMANDS)) {
RedisMultiCommand.defineCommand(RedisMultiCommand.prototype, name, command);
(RedisMultiCommand.prototype as any)[name] = function (...args: Array<unknown>): RedisMultiCommand {
return this.executeCommand(command, args);
};
}