You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-07 13:22:56 +03:00
legacy mode multi
This commit is contained in:
@@ -689,38 +689,71 @@ export default class RedisClient<
|
||||
);
|
||||
}
|
||||
|
||||
MULTI(): RedisClientMultiCommandType<[], M, F, S, RESP, FLAGS> {
|
||||
return new (this as any).Multi(
|
||||
async (
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
async executePipeline(commands: Array<RedisMultiQueuedCommand>) {
|
||||
if (!this._socket.isOpen) {
|
||||
return Promise.reject(new ClientClosedError());
|
||||
}
|
||||
|
||||
const promise = Promise.all(
|
||||
commands.map(({ args }) => this._queue.addCommand(args, {
|
||||
flags: (this as ProxyClient).commandOptions?.flags
|
||||
}))
|
||||
);
|
||||
this._tick();
|
||||
return promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
async executeMulti(
|
||||
commands: Array<RedisMultiQueuedCommand>,
|
||||
selectedDB?: number,
|
||||
chainId?: symbol
|
||||
) => {
|
||||
selectedDB?: number
|
||||
) {
|
||||
if (!this._socket.isOpen) {
|
||||
return Promise.reject(new ClientClosedError());
|
||||
}
|
||||
|
||||
const flags = (this as ProxyClient).commandOptions?.flags,
|
||||
promise = chainId ?
|
||||
// if `chainId` has a value, it's a `MULTI` (and not "pipeline") - need to add the `MULTI` and `EXEC` commands
|
||||
Promise.all([
|
||||
chainId = Symbol('MULTI Chain'),
|
||||
promises = [
|
||||
this._queue.addCommand(['MULTI'], { chainId }),
|
||||
this._addMultiCommands(commands, chainId),
|
||||
this._queue.addCommand(['EXEC'], { chainId, flags })
|
||||
]) :
|
||||
this._addMultiCommands(commands, undefined, flags);
|
||||
];
|
||||
|
||||
for (const { args } of commands) {
|
||||
promises.push(
|
||||
this._queue.addCommand(args, {
|
||||
chainId,
|
||||
flags
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
promises.push(
|
||||
this._queue.addCommand(['EXEC'], { chainId })
|
||||
);
|
||||
|
||||
this._tick();
|
||||
|
||||
const results = await promise;
|
||||
const results = await Promise.all(promises),
|
||||
execResult = results[results.length - 1];
|
||||
|
||||
if (execResult === null) {
|
||||
throw new WatchError();
|
||||
}
|
||||
|
||||
if (selectedDB !== undefined) {
|
||||
this._selectedDB = selectedDB;
|
||||
}
|
||||
|
||||
return results;
|
||||
return execResult as Array<unknown>;
|
||||
}
|
||||
);
|
||||
|
||||
MULTI(): RedisClientMultiCommandType<[], M, F, S, RESP, FLAGS> {
|
||||
return new (this as any).Multi(this);
|
||||
}
|
||||
|
||||
multi = this.MULTI;
|
||||
|
@@ -3,6 +3,7 @@ import { RedisClientType } from '.';
|
||||
import { getTransformReply } from '../commander';
|
||||
import { ErrorReply } from '../errors';
|
||||
import COMMANDS from '../commands';
|
||||
import RedisMultiCommand from '../multi-command';
|
||||
|
||||
type LegacyArgument = string | Buffer | number | Date;
|
||||
|
||||
@@ -15,10 +16,8 @@ type LegacyCommandArguments = LegacyArguments | [
|
||||
callback: LegacyCallback
|
||||
];
|
||||
|
||||
export type CommandSignature = (...args: LegacyCommandArguments) => void;
|
||||
|
||||
type WithCommands = {
|
||||
[P in keyof typeof COMMANDS]: CommandSignature;
|
||||
[P in keyof typeof COMMANDS]: (...args: LegacyCommandArguments) => void;
|
||||
};
|
||||
|
||||
export type RedisLegacyClientType = RedisLegacyClient & WithCommands;
|
||||
@@ -30,16 +29,16 @@ export class RedisLegacyClient {
|
||||
callback = args.pop() as LegacyCallback;
|
||||
}
|
||||
|
||||
RedisLegacyClient._pushArguments(redisArgs, args as LegacyArguments);
|
||||
RedisLegacyClient.pushArguments(redisArgs, args as LegacyArguments);
|
||||
|
||||
return callback;
|
||||
}
|
||||
|
||||
private static _pushArguments(redisArgs: CommandArguments, args: LegacyArguments) {
|
||||
static pushArguments(redisArgs: CommandArguments, args: LegacyArguments) {
|
||||
for (let i = 0; i < args.length; ++i) {
|
||||
const arg = args[i];
|
||||
if (Array.isArray(arg)) {
|
||||
RedisLegacyClient._pushArguments(redisArgs, arg);
|
||||
RedisLegacyClient.pushArguments(redisArgs, arg);
|
||||
} else {
|
||||
redisArgs.push(
|
||||
typeof arg === 'number' || arg instanceof Date ?
|
||||
@@ -50,14 +49,14 @@ export class RedisLegacyClient {
|
||||
}
|
||||
}
|
||||
|
||||
private static _getTransformReply(command: Command, resp: RespVersions) {
|
||||
static getTransformReply(command: Command, resp: RespVersions) {
|
||||
return command.TRANSFORM_LEGACY_REPLY ?
|
||||
getTransformReply(command, resp) :
|
||||
undefined;
|
||||
}
|
||||
|
||||
private static _createCommand(name: string, command: Command, resp: RespVersions) {
|
||||
const transformReply = RedisLegacyClient._getTransformReply(command, resp);
|
||||
const transformReply = RedisLegacyClient.getTransformReply(command, resp);
|
||||
return async function (this: RedisLegacyClient, ...args: LegacyCommandArguments) {
|
||||
const redisArgs = [name],
|
||||
callback = RedisLegacyClient._transformArguments(redisArgs, args),
|
||||
@@ -74,6 +73,8 @@ export class RedisLegacyClient {
|
||||
};
|
||||
}
|
||||
|
||||
private _Multi: ReturnType<typeof LegacyMultiCommand['factory']>;
|
||||
|
||||
constructor(
|
||||
private _client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>
|
||||
) {
|
||||
@@ -87,7 +88,7 @@ export class RedisLegacyClient {
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: Multi
|
||||
this._Multi = LegacyMultiCommand.factory(RESP);
|
||||
}
|
||||
|
||||
sendCommand(...args: LegacyArguments) {
|
||||
@@ -104,4 +105,68 @@ export class RedisLegacyClient {
|
||||
.then(reply => callback(null, reply))
|
||||
.catch(err => callback(err));
|
||||
}
|
||||
|
||||
multi() {
|
||||
return this._Multi(this._client);
|
||||
}
|
||||
}
|
||||
|
||||
type MultiWithCommands = {
|
||||
[P in keyof typeof COMMANDS]: (...args: LegacyCommandArguments) => RedisLegacyMultiType;
|
||||
};
|
||||
|
||||
export type RedisLegacyMultiType = Omit<LegacyMultiCommand, '_client'> & MultiWithCommands;
|
||||
|
||||
class LegacyMultiCommand extends RedisMultiCommand {
|
||||
private static _createCommand(name: string, command: Command, resp: RespVersions) {
|
||||
const transformReply = RedisLegacyClient.getTransformReply(command, resp);
|
||||
return function (this: LegacyMultiCommand, ...args: LegacyArguments) {
|
||||
const redisArgs = [name];
|
||||
RedisLegacyClient.pushArguments(redisArgs, args);
|
||||
return this.addCommand(redisArgs, transformReply);
|
||||
};
|
||||
}
|
||||
|
||||
static factory(resp: RespVersions) {
|
||||
const Multi = class extends LegacyMultiCommand {};
|
||||
|
||||
for (const [name, command] of Object.entries(COMMANDS)) {
|
||||
// TODO: as any?
|
||||
(Multi as any).prototype[name] = LegacyMultiCommand._createCommand(
|
||||
name,
|
||||
command,
|
||||
resp
|
||||
);
|
||||
}
|
||||
|
||||
return (client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>) => {
|
||||
return new Multi(client) as unknown as RedisLegacyMultiType;
|
||||
};
|
||||
}
|
||||
|
||||
private _client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>;
|
||||
|
||||
constructor(client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>) {
|
||||
super();
|
||||
this._client = client;
|
||||
}
|
||||
|
||||
sendCommand(...args: LegacyArguments) {
|
||||
const redisArgs: CommandArguments = [];
|
||||
RedisLegacyClient.pushArguments(redisArgs, args);
|
||||
return this.addCommand(redisArgs);
|
||||
}
|
||||
|
||||
exec(cb?: (err: ErrorReply | null, replies?: Array<unknown>) => unknown) {
|
||||
const promise = this._client.executeMulti(this.queue);
|
||||
|
||||
if (!cb) {
|
||||
promise.catch(err => this._client.emit('error', err));
|
||||
return;
|
||||
}
|
||||
|
||||
promise
|
||||
.then(results => cb(null, this.transformReplies(results)))
|
||||
.catch(err => cb?.(err));
|
||||
}
|
||||
}
|
||||
|
@@ -1,7 +1,7 @@
|
||||
import { SinglyLinkedList, DoublyLinkedList } from './linked-list';
|
||||
import { equal, deepEqual } from 'assert/strict';
|
||||
|
||||
describe.only('DoublyLinkedList', () => {
|
||||
describe('DoublyLinkedList', () => {
|
||||
const list = new DoublyLinkedList();
|
||||
|
||||
it('should start empty', () => {
|
||||
@@ -78,7 +78,7 @@ describe.only('DoublyLinkedList', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe.only('SinglyLinkedList', () => {
|
||||
describe('SinglyLinkedList', () => {
|
||||
const list = new SinglyLinkedList();
|
||||
|
||||
it('should start empty', () => {
|
||||
|
@@ -2,6 +2,7 @@ import COMMANDS from '../commands';
|
||||
import RedisMultiCommand, { RedisMultiQueuedCommand } from '../multi-command';
|
||||
import { ReplyWithFlags, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, Flags, ReplyUnion } from '../RESP/types';
|
||||
import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander';
|
||||
import { RedisClientType } from '.';
|
||||
|
||||
type CommandSignature<
|
||||
REPLIES extends Array<unknown>,
|
||||
@@ -90,7 +91,7 @@ type MULTI_REPLY = {
|
||||
|
||||
type MultiReply = MULTI_REPLY[keyof MULTI_REPLY];
|
||||
|
||||
type ReplyType<T extends MultiReply, REPLIES> = T extends MULTI_REPLY['TYPED'] ? REPLIES : Array<ReplyUnion>;
|
||||
type ReplyType<T extends MultiReply, REPLIES> = T extends MULTI_REPLY['TYPED'] ? REPLIES : Array<unknown>;
|
||||
|
||||
export type RedisClientMultiExecutor = (
|
||||
queue: Array<RedisMultiQueuedCommand>,
|
||||
@@ -161,62 +162,14 @@ export default class RedisClientMultiCommand<REPLIES = []> extends RedisMultiCom
|
||||
});
|
||||
}
|
||||
|
||||
// readonly #multi = new RedisMultiCommand();
|
||||
readonly #executor: RedisClientMultiExecutor;
|
||||
// readonly v4: Record<string, any> = {};
|
||||
readonly #client: RedisClientType;
|
||||
#selectedDB?: number;
|
||||
|
||||
constructor(executor: RedisClientMultiExecutor, legacyMode = false) {
|
||||
constructor(client: RedisClientType) {
|
||||
super();
|
||||
this.#executor = executor;
|
||||
// if (legacyMode) {
|
||||
// this.#legacyMode();
|
||||
// }
|
||||
this.#client = client;
|
||||
}
|
||||
|
||||
// #legacyMode(): void {
|
||||
// this.v4.addCommand = this.addCommand.bind(this);
|
||||
// (this as any).addCommand = (...args: Array<any>): this => {
|
||||
// this.#multi.addCommand(transformLegacyCommandArguments(args));
|
||||
// return this;
|
||||
// };
|
||||
// this.v4.exec = this.exec.bind(this);
|
||||
// (this as any).exec = (callback?: (err: Error | null, replies?: Array<unknown>) => unknown): void => {
|
||||
// this.v4.exec()
|
||||
// .then((reply: Array<unknown>) => {
|
||||
// if (!callback) return;
|
||||
|
||||
// callback(null, reply);
|
||||
// })
|
||||
// .catch((err: Error) => {
|
||||
// if (!callback) {
|
||||
// // this.emit('error', err);
|
||||
// return;
|
||||
// }
|
||||
|
||||
// callback(err);
|
||||
// });
|
||||
// };
|
||||
|
||||
// for (const [name, command] of Object.entries(COMMANDS as RedisCommands)) {
|
||||
// this.#defineLegacyCommand(name, command);
|
||||
// (this as any)[name.toLowerCase()] ??= (this as any)[name];
|
||||
// }
|
||||
// }
|
||||
|
||||
// #defineLegacyCommand(this: any, name: string, command?: RedisCommand): void {
|
||||
// this.v4[name] = this[name].bind(this.v4);
|
||||
// this[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ?
|
||||
// (...args: Array<unknown>) => {
|
||||
// this.#multi.addCommand(
|
||||
// [name, ...transformLegacyCommandArguments(args)],
|
||||
// command.transformReply
|
||||
// );
|
||||
// return this;
|
||||
// } :
|
||||
// (...args: Array<unknown>) => this.addCommand(name, ...args);
|
||||
// }
|
||||
|
||||
SELECT(db: number, transformReply?: TransformReply): this {
|
||||
this.#selectedDB = db;
|
||||
return this.addCommand(['SELECT', db.toString()], transformReply);
|
||||
@@ -224,15 +177,11 @@ export default class RedisClientMultiCommand<REPLIES = []> extends RedisMultiCom
|
||||
|
||||
select = this.SELECT;
|
||||
|
||||
async exec<T extends MultiReply = MULTI_REPLY['GENERIC']>(execAsPipeline = false) {
|
||||
async exec<T extends MultiReply = MULTI_REPLY['GENERIC']>(execAsPipeline = false): Promise<ReplyType<T, REPLIES>> {
|
||||
if (execAsPipeline) return this.execAsPipeline<T>();
|
||||
|
||||
return this.handleExecReplies(
|
||||
await this.#executor(
|
||||
this.queue,
|
||||
this.#selectedDB,
|
||||
RedisMultiCommand.generateChainId()
|
||||
)
|
||||
return this.transformReplies(
|
||||
await this.#client.executeMulti(this.queue, this.#selectedDB)
|
||||
) as ReplyType<T, REPLIES>;
|
||||
}
|
||||
|
||||
@@ -242,14 +191,11 @@ export default class RedisClientMultiCommand<REPLIES = []> extends RedisMultiCom
|
||||
return this.exec<MULTI_REPLY['TYPED']>(execAsPipeline);
|
||||
}
|
||||
|
||||
async execAsPipeline<T extends MultiReply = MULTI_REPLY['GENERIC']>() {
|
||||
async execAsPipeline<T extends MultiReply = MULTI_REPLY['GENERIC']>(): Promise<ReplyType<T, REPLIES>> {
|
||||
if (this.queue.length === 0) return [] as ReplyType<T, REPLIES>;
|
||||
|
||||
return this.transformReplies(
|
||||
await this.#executor(
|
||||
this.queue,
|
||||
this.#selectedDB
|
||||
)
|
||||
await this.#client.executePipeline(this.queue)
|
||||
) as ReplyType<T, REPLIES>;
|
||||
}
|
||||
|
||||
|
@@ -1,5 +1,4 @@
|
||||
import { Command, CommandArguments, RedisScript, TransformReply } from './RESP/types';
|
||||
import { WatchError } from './errors';
|
||||
import { CommandArguments, RedisScript, TransformReply } from './RESP/types';
|
||||
|
||||
export interface RedisMultiQueuedCommand {
|
||||
args: CommandArguments;
|
||||
@@ -7,10 +6,6 @@ export interface RedisMultiQueuedCommand {
|
||||
}
|
||||
|
||||
export default class RedisMultiCommand {
|
||||
static generateChainId(): symbol {
|
||||
return Symbol('RedisMultiCommand Chain Id');
|
||||
}
|
||||
|
||||
readonly queue: Array<RedisMultiQueuedCommand> = [];
|
||||
|
||||
readonly scriptsInUse = new Set<string>();
|
||||
@@ -42,15 +37,6 @@ export default class RedisMultiCommand {
|
||||
return this.addCommand(redisArgs, transformReply);
|
||||
}
|
||||
|
||||
handleExecReplies(rawReplies: Array<unknown>): Array<unknown> {
|
||||
const execReply = rawReplies[rawReplies.length - 1] as (null | Array<unknown>);
|
||||
if (execReply === null) {
|
||||
throw new WatchError();
|
||||
}
|
||||
|
||||
return this.transformReplies(execReply);
|
||||
}
|
||||
|
||||
transformReplies(rawReplies: Array<unknown>): Array<unknown> {
|
||||
return rawReplies.map((reply, i) => {
|
||||
const { transformReply, args } = this.queue[i];
|
||||
|
Reference in New Issue
Block a user