1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

add support for MULTI in ClientPool

This commit is contained in:
Leibale
2023-11-13 18:14:12 -05:00
parent 6549fa49bf
commit 2ca6e1f333
5 changed files with 64 additions and 46 deletions

View File

@@ -794,8 +794,12 @@ export default class RedisClient<
return execResult as Array<unknown>;
}
MULTI(): RedisClientMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> {
return new (this as any).Multi(this);
MULTI() {
type Multi = new (...args: ConstructorParameters<typeof RedisClientMultiCommand>) => RedisClientMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>;;
return new ((this as any).Multi as Multi)(
this._executeMulti.bind(this),
this._executePipeline.bind(this)
);
}
multi = this.MULTI;

View File

@@ -1,8 +1,7 @@
import COMMANDS from '../commands';
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command';
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType, RedisMultiQueuedCommand } from '../multi-command';
import { ReplyWithTypeMapping, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, TypeMapping } from '../RESP/types';
import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander';
import { RedisClientType } from '.';
type CommandSignature<
REPLIES extends Array<unknown>,
@@ -84,6 +83,10 @@ export type RedisClientMultiCommandType<
WithScripts<REPLIES, M, F, S, RESP, TYPE_MAPPING>
);
type ExecuteMulti = (commands: Array<RedisMultiQueuedCommand>, selectedDB?: number) => Promise<Array<unknown>>;
type ExecutePipeline = (commands: Array<RedisMultiQueuedCommand>) => Promise<Array<unknown>>;
export default class RedisClientMultiCommand<REPLIES = []> {
private static _createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
@@ -149,11 +152,14 @@ export default class RedisClientMultiCommand<REPLIES = []> {
}
private readonly _multi = new RedisMultiCommand();
private readonly _client: RedisClientType;
private readonly _executeMulti: ExecuteMulti;
private readonly _executePipeline: ExecutePipeline;
private _selectedDB?: number;
constructor(client: RedisClientType) {
this._client = client;
constructor(executeMulti: ExecuteMulti, executePipeline: ExecutePipeline) {
this._executeMulti = executeMulti;
this._executePipeline = executePipeline;
// this._client = client;
}
SELECT(db: number, transformReply?: TransformReply): this {
@@ -173,7 +179,7 @@ export default class RedisClientMultiCommand<REPLIES = []> {
if (execAsPipeline) return this.execAsPipeline<T>();
return this._multi.transformReplies(
await this._client._executeMulti(this._multi.queue, this._selectedDB)
await this._executeMulti(this._multi.queue, this._selectedDB)
) as MultiReplyType<T, REPLIES>;
}
@@ -187,7 +193,7 @@ export default class RedisClientMultiCommand<REPLIES = []> {
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
return this._multi.transformReplies(
await this._client._executePipeline(this._multi.queue)
await this._executePipeline(this._multi.queue)
) as MultiReplyType<T, REPLIES>;
}

View File

@@ -6,6 +6,7 @@ import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-l
import { TimeoutError } from '../errors';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { CommandOptions } from './commands-queue';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
export interface RedisPoolOptions {
/**
@@ -118,7 +119,6 @@ export class RedisClientPool<
clientOptions?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>,
options?: Partial<RedisPoolOptions>
) {
// @ts-ignore
const Pool = attachConfig({
BaseClass: RedisClientPool,
commands: COMMANDS,
@@ -129,6 +129,8 @@ export class RedisClientPool<
config: clientOptions
});
Pool.prototype.Multi = RedisClientMultiCommand.extend(clientOptions);
// returning a "proxy" to prevent the namespaces.self to leak between "proxies"
return Object.create(
new Pool(
@@ -327,8 +329,8 @@ export class RedisClientPool<
this._returnClient(node);
}
execute<T>(fn: PoolTask<M, F, S, RESP, TYPE_MAPPING, T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
execute<T>(fn: PoolTask<M, F, S, RESP, TYPE_MAPPING, T>) {
return new Promise<Awaited<T>>((resolve, reject) => {
const client = this._idleClients.shift(),
{ tail } = this._tasksQueue;
if (!client) {
@@ -425,6 +427,16 @@ export class RedisClientPool<
return this.execute(client => client.executeScript(script, args, options));
}
MULTI() {
type Multi = new (...args: ConstructorParameters<typeof RedisClientMultiCommand>) => RedisClientMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>;
return new ((this as any).Multi as Multi)(
(commands, selectedDB) => this.execute(client => client._executeMulti(commands, selectedDB)),
commands => this.execute(client => client._executePipeline(commands))
);
}
multi = this.MULTI;
async close() {
if (this._isClosing) return; // TODO: throw err?
if (!this._isOpen) return; // TODO: throw err?

View File

@@ -504,33 +504,17 @@ export default class RedisCluster<
);
}
/**
* @internal
*/
async _executePipeline(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) {
const client = await this._slots.getClient(firstKey, isReadonly);
return client._executePipeline(commands);
}
/**
* @internal
*/
async _executeMulti(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) {
const client = await this._slots.getClient(firstKey, isReadonly);
return client._executeMulti(commands);
}
MULTI(routing?: RedisArgument): RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> {
return new (this as any).Multi(
this,
MULTI(routing?: RedisArgument) {
type Multi = new (...args: ConstructorParameters<typeof RedisClusterMultiCommand>) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>;
return new ((this as any).Multi as Multi)(
async (firstKey, isReadonly, commands) => {
const client = await this._slots.getClient(firstKey, isReadonly);
return client._executeMulti(commands);
},
async (firstKey, isReadonly, commands) => {
const client = await this._slots.getClient(firstKey, isReadonly);
return client._executePipeline(commands);
},
routing
);
}

View File

@@ -1,8 +1,8 @@
import COMMANDS from '../commands';
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command';
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType, RedisMultiQueuedCommand } from '../multi-command';
import { ReplyWithTypeMapping, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, TypeMapping, RedisArgument } from '../RESP/types';
import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander';
import RedisCluster, { RedisClusterType } from '.';
import RedisCluster from '.';
type CommandSignature<
REPLIES extends Array<unknown>,
@@ -84,6 +84,12 @@ export type RedisClusterMultiCommandType<
WithScripts<REPLIES, M, F, S, RESP, TYPE_MAPPING>
);
export type ClusterMultiExecute = (
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) => Promise<Array<unknown>>;
export default class RedisClusterMultiCommand<REPLIES = []> {
private static _createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
@@ -181,12 +187,18 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
}
private readonly _multi = new RedisMultiCommand();
private readonly _cluster: RedisClusterType;
private readonly _executeMulti: ClusterMultiExecute;
private readonly _executePipeline: ClusterMultiExecute;
private _firstKey: RedisArgument | undefined;
private _isReadonly: boolean | undefined = true;
constructor(cluster: RedisClusterType, routing: RedisArgument | undefined) {
this._cluster = cluster;
constructor(
executeMulti: ClusterMultiExecute,
executePipeline: ClusterMultiExecute,
routing: RedisArgument | undefined
) {
this._executeMulti = executeMulti;
this._executePipeline = executePipeline;
this._firstKey = routing;
}
@@ -213,7 +225,7 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
if (execAsPipeline) return this.execAsPipeline<T>();
return this._multi.transformReplies(
await this._cluster._executeMulti(
await this._executeMulti(
this._firstKey,
this._isReadonly,
this._multi.queue
@@ -231,7 +243,7 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
return this._multi.transformReplies(
await this._cluster._executePipeline(
await this._executePipeline(
this._firstKey,
this._isReadonly,
this._multi.queue