1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

client pool

This commit is contained in:
Leibale
2023-09-04 17:26:48 -04:00
parent c7a03acfd3
commit 3895eb5926
15 changed files with 565 additions and 352 deletions

View File

@@ -19,7 +19,7 @@ redis.register_function {
Here is the same example, but in a format that can be pasted into the `redis-cli`.
```
FUNCTION LOAD "#!lua name=library\nredis.register_function{function_name=\"add\", callback=function(keys, args) return redis.call('GET', keys[1])+args[1] end, flags={\"no-writes\"}}"
FUNCTION LOAD "#!lua name=library\nredis.register_function{function_name='add', callback=function(keys, args) return redis.call('GET', keys[1])+args[1] end, flags={'no-writes'}}"
```
Load the prior redis function on the _redis server_ before running the example below.

View File

@@ -4,11 +4,15 @@ export { VerbatimString } from './lib/RESP/verbatim-string';
export { defineScript } from './lib/lua-script';
// export * from './lib/errors';
import RedisClient, { RedisClientType, RedisClientOptions } from './lib/client';
export { RedisClientType, RedisClientOptions };
import RedisClient, { RedisClientOptions, RedisClientType } from './lib/client';
export { RedisClientOptions, RedisClientType };
export const createClient = RedisClient.create;
import RedisCluster, { RedisClusterType, RedisClusterOptions } from './lib/cluster';
import { RedisClientPool, RedisPoolOptions, RedisClientPoolType } from './lib/client/pool';
export { RedisClientPoolType, RedisPoolOptions };
export const createClientPool = RedisClientPool.create;
import RedisCluster, { RedisClusterOptions, RedisClusterType } from './lib/cluster';
export { RedisClusterType, RedisClusterOptions };
export const createCluster = RedisCluster.create;

View File

@@ -111,7 +111,10 @@ export class Decoder {
case RESP_TYPES.NUMBER:
return this._handleDecodedValue(
this._config.onReply,
this._decodeNumber(chunk)
this._decodeNumber(
this._config.getTypeMapping()[RESP_TYPES.NUMBER],
chunk
)
);
case RESP_TYPES.BIG_NUMBER:
@@ -226,7 +229,11 @@ export class Decoder {
return boolean;
}
private _decodeNumber(chunk) {
private _decodeNumber(type, chunk) {
if (type === String) {
return this._decodeSimpleString(String, chunk);
}
switch (chunk[this._cursor]) {
case ASCII['+']:
return this._maybeDecodeNumberValue(false, chunk);
@@ -675,7 +682,7 @@ export class Decoder {
return this._decodeBoolean(chunk);
case RESP_TYPES.NUMBER:
return this._decodeNumber(chunk);
return this._decodeNumber(typeMapping[RESP_TYPES.NUMBER], chunk);
case RESP_TYPES.BIG_NUMBER:
return this._decodeBigNumber(typeMapping[RESP_TYPES.BIG_NUMBER], chunk);

View File

@@ -11,7 +11,7 @@ export interface CommandOptions<T = TypeMapping> {
asap?: boolean;
abortSignal?: AbortSignal;
/**
* Maps bettween RESP and JavaScript types
* Maps between RESP and JavaScript types
*/
typeMapping?: T;
}

View File

@@ -7,13 +7,13 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchErr
import { URL } from 'url';
import { TcpSocketConnectOpts } from 'net';
import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
import { Command, CommandArguments, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
import { RedisMultiQueuedCommand } from '../multi-command';
import HELLO, { HelloOptions } from '../commands/HELLO';
import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
// import { RedisClientPool } from './pool';
import { RedisPoolOptions, RedisClientPool } from './pool';
interface ClientCommander<
M extends RedisModules,
@@ -21,7 +21,7 @@ interface ClientCommander<
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> extends CommanderConfig<M, F, S, RESP>{
> extends CommanderConfig<M, F, S, RESP> {
commandOptions?: CommandOptions<TYPE_MAPPING>;
}
@@ -72,7 +72,7 @@ export interface RedisClientOptions<
readonly?: boolean;
/**
* Send `PING` command at interval (in ms).
* Useful with Redis deployments that do not use TCP Keep-Alive.
* Useful with Redis deployments that do not honor TCP Keep-Alive.
*/
pingInterval?: number;
}
@@ -194,13 +194,7 @@ export default class RedisClient<
return async function (this: ProxyClient, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this.sendCommand(redisArgs, this._commandOptions).catch((err: unknown) => {
if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err;
redisArgs[0] = 'EVAL';
redisArgs[1] = script.SCRIPT;
return this.sendCommand(redisArgs, this._commandOptions);
});
reply = await this.executeScript(script, redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
@@ -218,8 +212,8 @@ export default class RedisClient<
BaseClass: RedisClient,
commands: COMMANDS,
createCommand: RedisClient._createCommand,
createFunctionCommand: RedisClient._createFunctionCommand,
createModuleCommand: RedisClient._createModuleCommand,
createFunctionCommand: RedisClient._createFunctionCommand,
createScriptCommand: RedisClient._createScriptCommand,
config
});
@@ -227,8 +221,7 @@ export default class RedisClient<
Client.prototype.Multi = RedisClientMultiCommand.extend(config);
return (options?: Omit<RedisClientOptions, keyof Exclude<typeof config, undefined>>) => {
// returning a proxy of the client to prevent the namespaces.self to leak between proxies
// namespaces will be bootstraped on first access per proxy
// returning a "proxy" to prevent the namespaces.self to leak between "proxies"
return Object.create(new Client(options)) as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
};
}
@@ -527,13 +520,14 @@ export default class RedisClient<
}
/**
* Create `RedisClientPool` using this client as a prototype
* Create {@link RedisClientPool `RedisClientPool`} using this client as a prototype
*/
// pool() {
// return RedisClientPool.fromClient(
// this as unknown as RedisClientType<M, F, S, RESP>
// );
// }
pool(options?: Partial<RedisPoolOptions>) {
return RedisClientPool.create(
this._options,
options
);
}
duplicate<
_M extends RedisModules = M,
@@ -549,12 +543,13 @@ export default class RedisClient<
}) as RedisClientType<_M, _F, _S, _RESP, _TYPE_MAPPING>;
}
connect() {
return this._socket.connect();
async connect() {
await this._socket.connect();
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
}
sendCommand<T = ReplyUnion>(
args: CommandArguments,
args: Array<RedisArgument>,
options?: CommandOptions
): Promise<T> {
if (!this._socket.isOpen) {
@@ -568,6 +563,22 @@ export default class RedisClient<
return promise;
}
async executeScript(
script: RedisScript,
args: Array<RedisArgument>,
options?: CommandOptions
) {
try {
return await this.sendCommand(args, options);
} catch (err) {
if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err;
args[0] = 'EVAL';
args[1] = script.SCRIPT;
return await this.sendCommand(args, options);
}
}
async SELECT(db: number): Promise<void> {
await this.sendCommand(['SELECT', db.toString()]);
this._selectedDB = db;
@@ -728,7 +739,7 @@ export default class RedisClient<
/**
* @internal
*/
executePipeline(commands: Array<RedisMultiQueuedCommand>) {
_executePipeline(commands: Array<RedisMultiQueuedCommand>) {
if (!this._socket.isOpen) {
return Promise.reject(new ClientClosedError());
}
@@ -745,7 +756,7 @@ export default class RedisClient<
/**
* @internal
*/
async executeMulti(
async _executeMulti(
commands: Array<RedisMultiQueuedCommand>,
selectedDB?: number
) {

View File

@@ -160,7 +160,7 @@ class LegacyMultiCommand {
}
exec(cb?: (err: ErrorReply | null, replies?: Array<unknown>) => unknown) {
const promise = this._client.executeMulti(this._multi.queue);
const promise = this._client._executeMulti(this._multi.queue);
if (!cb) {
promise.catch(err => this._client.emit('error', err));

View File

@@ -138,9 +138,26 @@ export class SinglyLinkedList<T> {
};
if (this._head === undefined) {
this._head = this._tail = node;
return this._head = this._tail = node;
}
return this._tail!.next = this._tail = node;
}
remove(node: SinglyLinkedNode<T>, parent: SinglyLinkedNode<T> | undefined) {
--this._length;
if (this._head === node) {
if (this._tail === node) {
this._head = this._tail = undefined;
} else {
this._tail!.next = this._tail = node;
this._head = node.next;
}
} else if (this._tail === node) {
this._tail = parent;
parent!.next = undefined;
} else {
parent!.next = node.next;
}
}

View File

@@ -173,7 +173,7 @@ export default class RedisClientMultiCommand<REPLIES = []> {
if (execAsPipeline) return this.execAsPipeline<T>();
return this._multi.transformReplies(
await this._client.executeMulti(this._multi.queue, this._selectedDB)
await this._client._executeMulti(this._multi.queue, this._selectedDB)
) as MultiReplyType<T, REPLIES>;
}
@@ -187,7 +187,7 @@ export default class RedisClientMultiCommand<REPLIES = []> {
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
return this._multi.transformReplies(
await this._client.executePipeline(this._multi.queue)
await this._client._executePipeline(this._multi.queue)
) as MultiReplyType<T, REPLIES>;
}

View File

@@ -1,242 +1,469 @@
// import COMMANDS from '../commands';
// import { RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
// import RedisClient, { RedisClientType, RedisClientOptions, RedisClientExtensions } from '.';
// import { EventEmitter } from 'events';
// import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-list';
import COMMANDS from '../commands';
import { Command, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
import RedisClient, { RedisClientType, RedisClientOptions, RedisClientExtensions } from '.';
import { EventEmitter } from 'events';
import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-list';
import { TimeoutError } from '../errors';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { CommandOptions } from './commands-queue';
// export type RedisPoolOptions = typeof RedisClientPool['_DEFAULTS'];
export interface RedisPoolOptions {
/**
* The minimum number of clients to keep in the pool (>= 1).
*/
minimum: number;
/**
* The maximum number of clients to keep in the pool (>= {@link RedisPoolOptions.minimum} >= 1).
*/
maximum: number;
/**
* The maximum time a task can wait for a client to become available (>= 0).
*/
acquireTimeout: number;
/**
* TODO
*/
cleanupDelay: number;
}
// export type PoolTask<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts,
// RESP extends RespVersions,
// TYPE_MAPPING extends TypeMapping,
// T = unknown
// > = (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>) => T;
export type PoolTask<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping,
T = unknown
> = (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>) => T;
// export type RedisClientPoolType<
// M extends RedisModules = {},
// F extends RedisFunctions = {},
// S extends RedisScripts = {},
// RESP extends RespVersions = 2,
// TYPE_MAPPING extends TypeMapping = {}
// > = (
// RedisClientPool<M, F, S, RESP, TYPE_MAPPING> &
// RedisClientExtensions<M, F, S, RESP, TYPE_MAPPING>
// );
export type RedisClientPoolType<
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2,
TYPE_MAPPING extends TypeMapping = {}
> = (
RedisClientPool<M, F, S, RESP, TYPE_MAPPING> &
RedisClientExtensions<M, F, S, RESP, TYPE_MAPPING>
);
// export class RedisClientPool<
// M extends RedisModules = {},
// F extends RedisFunctions = {},
// S extends RedisScripts = {},
// RESP extends RespVersions = 2,
// TYPE_MAPPING extends TypeMapping = {}
// > extends EventEmitter {
// static fromClient<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts,
// RESP extends RespVersions,
// TYPE_MAPPING extends TypeMapping = {}
// >(
// client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
// poolOptions: Partial<RedisPoolOptions>
// ) {
// return RedisClientPool.create(
// () => client.duplicate(),
// poolOptions
// );
// }
type ProxyPool = RedisClientPoolType<any, any, any, any, any>;
// static fromOptions<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts,
// RESP extends RespVersions,
// TYPE_MAPPING extends TypeMapping = {}
// >(
// options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>,
// poolOptions: Partial<RedisPoolOptions>
// ) {
// return RedisClientPool.create(
// RedisClient.factory(options),
// poolOptions
// );
// }
type NamespaceProxyPool = { self: ProxyPool };
// static create<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts,
// RESP extends RespVersions,
// TYPE_MAPPING extends TypeMapping = {}
// >(
// clientFactory: () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
// options?: Partial<RedisPoolOptions>
// ) {
// return new RedisClientPool(
// clientFactory,
// options
// ) as RedisClientPoolType<M, F, S, RESP, TYPE_MAPPING>;
// }
export class RedisClientPool<
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2,
TYPE_MAPPING extends TypeMapping = {}
> extends EventEmitter {
private static _createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: ProxyPool, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
}
// private static _DEFAULTS = {
// /**
// * The minimum number of clients to keep in the pool.
// */
// minimum: 0,
// /**
// * The maximum number of clients to keep in the pool.
// */
// maximum: 1,
// /**
// * The maximum time a task can wait for a client to become available.
// */
// acquireTimeout: 3000,
// /**
// * When there are `> minimum && < maximum` clients in the pool, the pool will wait for `cleanupDelay` milliseconds before closing the extra clients.
// */
// cleanupDelay: 3000
// };
private static _createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: NamespaceProxyPool, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this.self.sendCommand(redisArgs, this.self._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
}
// private readonly _clientFactory: () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
// private readonly _options: Required<RedisPoolOptions>;
// private readonly _idleClients = new SinglyLinkedList<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>();
// private readonly _usedClients = new DoublyLinkedList<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>();
// private readonly _tasksQueue = new SinglyLinkedList<{
// resolve: <T>(value: T | PromiseLike<T>) => void;
// reject: (reason?: unknown) => void;
// fn: PoolTask<M, F, S, RESP, TYPE_MAPPING>;
// }>();
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp);
return async function (this: NamespaceProxyPool, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args),
reply = await this.self.sendCommand(
prefix.concat(fnArgs),
this.self._commandOptions
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
};
}
// constructor(
// clientFactory: () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
// options?: Partial<RedisPoolOptions>
// ) {
// super();
private static _createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp);
return async function (this: ProxyPool, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this.executeScript(script, redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
};
}
// this._clientFactory = clientFactory;
// this._options = {
// ...RedisClientPool._DEFAULTS,
// ...options
// };
// this._initate();
// }
static create<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping = {}
>(
// clientFactory: () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
clientOptions?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>,
options?: Partial<RedisPoolOptions>
) {
// @ts-ignore
const Pool = attachConfig({
BaseClass: RedisClientPool,
commands: COMMANDS,
createCommand: RedisClientPool._createCommand,
createModuleCommand: RedisClientPool._createModuleCommand,
createFunctionCommand: RedisClientPool._createFunctionCommand,
createScriptCommand: RedisClientPool._createScriptCommand,
config: clientOptions
});
// private async _initate() {
// const promises = [];
// while (promises.length < this._options.minimum) {
// promises.push(this._create());
// }
// returning a "proxy" to prevent the namespaces.self to leak between "proxies"
return Object.create(
new Pool(
RedisClient.factory(clientOptions).bind(undefined, clientOptions),
options
)
) as RedisClientPoolType<M, F, S, RESP, TYPE_MAPPING>;
}
// try {
// await Promise.all(promises);
// } catch (err) {
// this.destroy();
// this.emit('error', err);
// }
// }
// TODO: defaults
private static _DEFAULTS = {
minimum: 1,
maximum: 100,
acquireTimeout: 3000,
cleanupDelay: 3000
} satisfies RedisPoolOptions;
// private async _create() {
// const client = this._clientFactory()
// // TODO: more events?
// .on('error', (err: Error) => this.emit('error', err));
private readonly _clientFactory: () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
private readonly _options: RedisPoolOptions;
// const node = this._usedClients.push(client);
private readonly _idleClients = new SinglyLinkedList<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>();
// await client.connect();
/**
* The number of idle clients.
*/
get idleClients() {
return this._idleClients.length;
}
// this._usedClients.remove(node);
private readonly _clientsInUse = new DoublyLinkedList<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>();
// return client;
// }
/**
* The number of clients in use.
*/
get clientsInUse() {
return this._clientsInUse.length;
}
// execute<T>(fn: PoolTask<M, F, S, RESP, TYPE_MAPPING, T>): Promise<T> {
// return new Promise<T>((resolve, reject) => {
// let client = this._idleClients.shift();
// if (!client) {
// this._tasksQueue.push({
// // @ts-ignore
// resolve,
// reject,
// fn
// });
private readonly _connectingClients = 0;
// if (this._idleClients.length + this._usedClients.length < this._options.maximum) {
// this._create();
// }
/**
* The number of clients that are currently connecting.
*/
get connectingClients() {
return this._connectingClients;
}
// return;
// }
/**
* The total number of clients in the pool (including connecting, idle, and in use).
*/
get totalClients() {
return this._idleClients.length + this._clientsInUse.length;
}
// const node = this._usedClients.push(client);
// // @ts-ignore
// this._executeTask(node, resolve, reject, fn);
// });
// }
private readonly _tasksQueue = new SinglyLinkedList<{
timeout: NodeJS.Timeout | undefined;
resolve: (value: unknown) => unknown;
reject: (reason?: unknown) => unknown;
fn: PoolTask<M, F, S, RESP, TYPE_MAPPING>;
}>();
// private _executeTask(
// node: DoublyLinkedNode<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>,
// resolve: <T>(value: T | PromiseLike<T>) => void,
// reject: (reason?: unknown) => void,
// fn: PoolTask<M, F, S, RESP, TYPE_MAPPING>
// ) {
// const result = fn(node.value);
// if (result instanceof Promise) {
// result.then(resolve, reject);
// result.finally(() => this._returnClient(node))
// } else {
// resolve(result);
// this._returnClient(node);
// }
// }
/**
* The number of tasks waiting for a client to become available.
*/
get tasksQueueLength() {
return this._tasksQueue.length;
}
// private _returnClient(node: DoublyLinkedListNode<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>) {
// const task = this._tasksQueue.shift();
// if (task) {
// this._executeTask(node, task.resolve, task.reject, task.fn);
// return;
// }
private _isOpen = false;
// if (this._idleClients.length >= this._options.minimum) {
// node.client.destroy();
// return;
// }
/**
* Whether the pool is open (either connecting or connected).
*/
get isOpen() {
return this._isOpen;
}
// this._usedClients.remove(node);
// this._idleClients.push(node.client);
// }
private _isClosing = false;
// async close() {
// const promises = [];
/**
* Whether the pool is closing (*not* closed).
*/
get isClosing() {
return this._isClosing;
}
// for (const client of this._idleClients) {
// promises.push(client.close());
// }
/**
* You are probably looking for {@link RedisClient.pool `RedisClient.pool`},
* {@link RedisClientPool.fromClient `RedisClientPool.fromClient`},
* or {@link RedisClientPool.fromOptions `RedisClientPool.fromOptions`}...
*/
constructor(
clientFactory: () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
options?: Partial<RedisPoolOptions>
) {
super();
// this._idleClients.reset();
this._clientFactory = clientFactory;
this._options = {
...RedisClientPool._DEFAULTS,
...options
};
}
// for (const client of this._usedClients) {
// promises.push(client.close());
// }
private _self = this;
private _commandOptions?: CommandOptions<TYPE_MAPPING>;
// this._usedClients.reset();
withCommandOptions<
OPTIONS extends CommandOptions<TYPE_MAPPING>,
TYPE_MAPPING extends TypeMapping
>(options: OPTIONS) {
const proxy = Object.create(this._self);
proxy._commandOptions = options;
return proxy as RedisClientPoolType<
M,
F,
S,
RESP,
TYPE_MAPPING extends TypeMapping ? TYPE_MAPPING : {}
>;
}
// await Promise.all(promises);
// }
private _commandOptionsProxy<
K extends keyof CommandOptions,
V extends CommandOptions[K]
>(
key: K,
value: V
) {
const proxy = Object.create(this._self);
proxy._commandOptions = Object.create(this._commandOptions ?? null);
proxy._commandOptions[key] = value;
return proxy as RedisClientPoolType<
M,
F,
S,
RESP,
K extends 'typeMapping' ? V extends TypeMapping ? V : {} : TYPE_MAPPING
>;
}
// destroy() {
// for (const client of this._idleClients) {
// client.destroy();
// }
/**
* Override the `typeMapping` command option
*/
withTypeMapping<TYPE_MAPPING extends TypeMapping>(typeMapping: TYPE_MAPPING) {
return this._commandOptionsProxy('typeMapping', typeMapping);
}
// this._idleClients.reset();
/**
* Override the `abortSignal` command option
*/
withAbortSignal(abortSignal: AbortSignal) {
return this._commandOptionsProxy('abortSignal', abortSignal);
}
// for (const client of this._usedClients) {
// client.destroy();
// }
/**
* Override the `asap` command option to `true`
* TODO: remove?
*/
asap() {
return this._commandOptionsProxy('asap', true);
}
// this._usedClients.reset();
// }
// }
async connect() {
if (this._isOpen) return; // TODO: throw error?
this._isOpen = true;
const promises = [];
while (promises.length < this._options.minimum) {
promises.push(this._create());
}
try {
await Promise.all(promises);
return this as unknown as RedisClientPoolType<M, F, S, RESP, TYPE_MAPPING>;
} catch (err) {
this.destroy();
throw err;
}
}
private async _create() {
const node = this._clientsInUse.push(
this._clientFactory()
.on('error', (err: Error) => this.emit('error', err))
);
try {
await node.value.connect();
} catch (err) {
this._clientsInUse.remove(node);
throw err;
}
this._returnClient(node);
}
execute<T>(fn: PoolTask<M, F, S, RESP, TYPE_MAPPING, T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
const client = this._idleClients.shift(),
{ tail } = this._tasksQueue;
if (!client) {
let timeout;
if (this._options.acquireTimeout > 0) {
timeout = setTimeout(
() => {
this._tasksQueue.remove(task, tail);
reject(new TimeoutError('Timeout waiting for a client')); // TODO: message
},
this._options.acquireTimeout
);
}
const task = this._tasksQueue.push({
timeout,
// @ts-ignore
resolve,
reject,
fn
});
if (this.totalClients < this._options.maximum) {
this._create();
}
return;
}
const node = this._clientsInUse.push(client);
// @ts-ignore
this._executeTask(node, resolve, reject, fn);
});
}
private _executeTask(
node: DoublyLinkedNode<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>,
resolve: <T>(value: T | PromiseLike<T>) => void,
reject: (reason?: unknown) => void,
fn: PoolTask<M, F, S, RESP, TYPE_MAPPING>
) {
const result = fn(node.value);
if (result instanceof Promise) {
result.then(resolve, reject);
result.finally(() => this._returnClient(node))
} else {
resolve(result);
this._returnClient(node);
}
}
private _returnClient(node: DoublyLinkedNode<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>) {
const task = this._tasksQueue.shift();
if (task) {
this._executeTask(node, task.resolve, task.reject, task.fn);
return;
}
this._clientsInUse.remove(node);
this._idleClients.push(node.value);
this._scheduleCleanup();
}
cleanupTimeout?: NodeJS.Timeout;
private _scheduleCleanup() {
if (this.totalClients <= this._options.minimum) return;
clearTimeout(this.cleanupTimeout);
this.cleanupTimeout = setTimeout(() => this._cleanup(), this._options.cleanupDelay);
}
private _cleanup() {
const toDestroy = Math.min(this._idleClients.length, this.totalClients - this._options.minimum);
for (let i = 0; i < toDestroy; i++) {
// TODO: shift vs pop
this._idleClients.shift()!.destroy();
}
}
sendCommand(
args: Array<RedisArgument>,
options?: CommandOptions
) {
return this.execute(client => client.sendCommand(args, options));
}
executeScript(
script: RedisScript,
args: Array<RedisArgument>,
options?: CommandOptions
) {
return this.execute(client => client.executeScript(script, args, options));
}
async close() {
if (this._isClosing) return; // TODO: throw err?
if (!this._isOpen) return; // TODO: throw err?
this._isClosing = true;
try {
const promises = [];
for (const client of this._idleClients) {
promises.push(client.close());
}
for (const client of this._clientsInUse) {
promises.push(client.close());
}
await Promise.all(promises);
this._idleClients.reset();
this._clientsInUse.reset();
} catch (err) {
} finally {
this._isClosing = false;
}
}
destroy() {
for (const client of this._idleClients) {
client.destroy();
}
this._idleClients.reset();
for (const client of this._clientsInUse) {
client.destroy();
}
this._clientsInUse.reset();
this._isOpen = false;
}
}

View File

@@ -1,4 +1,4 @@
import { RedisClientOptions } from '../client';
import { RedisClientOptions, RedisClientType } from '../client';
import { CommandOptions } from '../client/commands-queue';
import { Command, CommandArguments, CommanderConfig, CommandPolicies, CommandWithPoliciesSignature, TypeMapping, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions } from '../RESP/types';
import COMMANDS from '../commands';
@@ -224,8 +224,8 @@ export default class RedisCluster<
BaseClass: RedisCluster,
commands: COMMANDS,
createCommand: RedisCluster._createCommand,
createFunctionCommand: RedisCluster._createFunctionCommand,
createModuleCommand: RedisCluster._createModuleCommand,
createFunctionCommand: RedisCluster._createFunctionCommand,
createScriptCommand: RedisCluster._createScriptCommand,
config
});
@@ -233,8 +233,7 @@ export default class RedisCluster<
Cluster.prototype.Multi = RedisClusterMultiCommand.extend(config);
return (options?: Omit<RedisClusterOptions, keyof Exclude<typeof config, undefined>>) => {
// returning a proxy of the client to prevent the namespaces.self to leak between proxies
// namespaces will be bootstraped on first access per proxy
// returning a "proxy" to prevent the namespaces.self to leak between "proxies"
return Object.create(new Cluster(options)) as RedisClusterType<M, F, S, RESP, TYPE_MAPPING, POLICIES>;
};
}
@@ -388,21 +387,17 @@ export default class RedisCluster<
return this._commandOptionsProxy('policies', policies);
}
async sendCommand<T = ReplyUnion>(
async #execute<T>(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
args: CommandArguments,
options?: ClusterCommandOptions,
deafultPolicies?: CommandPolicies
fn: (client: RedisClientType<M, F, S, RESP>) => Promise<T>
): Promise<T> {
// const requestPolicy = options?.policies?.request ?? deafultPolicies?.request,
// responsePolicy = options?.policies?.response ?? deafultPolicies?.response;
const maxCommandRedirections = this._options.maxCommandRedirections ?? 16;
let client = await this._slots.getClient(firstKey, isReadonly);
for (let i = 0; ; i++) {
let client = await this._slots.getClient(firstKey, isReadonly),
i = 0;
while (true) {
try {
return await client.sendCommand<T>(args, options);
return await fn(client);
} catch (err) {
// TODO: error class
if (++i > maxCommandRedirections || !(err instanceof Error)) {
@@ -424,7 +419,9 @@ export default class RedisCluster<
await redirectTo.asking();
client = redirectTo;
continue;
} else if (err.message.startsWith('MOVED')) {
}
if (err.message.startsWith('MOVED')) {
await this._slots.rediscover(client);
client = await this._slots.getClient(firstKey, isReadonly);
continue;
@@ -435,28 +432,56 @@ export default class RedisCluster<
}
}
/**
* @internal
*/
async executePipeline(
async sendCommand<T = ReplyUnion>(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
args: CommandArguments,
options?: ClusterCommandOptions,
defaultPolicies?: CommandPolicies
): Promise<T> {
return this.#execute(
firstKey,
isReadonly,
client => client.sendCommand(args, options)
);
}
executeScript(
script: RedisScript,
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
args: Array<RedisArgument>,
options?: CommandOptions
) {
const client = await this._slots.getClient(firstKey, isReadonly);
return client.executePipeline(commands);
return this.#execute(
firstKey,
isReadonly,
client => client.executeScript(script, args, options)
);
}
/**
* @internal
*/
async executeMulti(
async _executePipeline(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) {
const client = await this._slots.getClient(firstKey, isReadonly);
return client.executeMulti(commands);
return client._executePipeline(commands);
}
/**
* @internal
*/
async _executeMulti(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) {
const client = await this._slots.getClient(firstKey, isReadonly);
return client._executeMulti(commands);
}
MULTI(routing?: RedisArgument): RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> {

View File

@@ -213,7 +213,7 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
if (execAsPipeline) return this.execAsPipeline<T>();
return this._multi.transformReplies(
await this._cluster.executeMulti(
await this._cluster._executeMulti(
this._firstKey,
this._isReadonly,
this._multi.queue
@@ -231,7 +231,7 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
return this._multi.transformReplies(
await this._cluster.executePipeline(
await this._cluster._executePipeline(
this._firstKey,
this._isReadonly,
this._multi.queue

View File

@@ -67,3 +67,5 @@ export class ErrorReply extends Error {
export class SimpleError extends ErrorReply {}
export class BlobError extends ErrorReply {}
export class TimeoutError extends Error {}

44
test/package-lock.json generated
View File

@@ -1,44 +0,0 @@
{
"name": "test",
"version": "1.0.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "test",
"version": "1.0.0",
"license": "ISC",
"dependencies": {
"@redis/client": "next"
}
},
"node_modules/@redis/client": {
"version": "2.0.0-next.2",
"resolved": "https://registry.npmjs.org/@redis/client/-/client-2.0.0-next.2.tgz",
"integrity": "sha512-+sf9n+PBHac2xXSofSX0x79cYa5H4ighu80F993q4H1T109ZthFNGBmg33DfwfPrDMKc256qTXvsb0lCqzwMmg==",
"dependencies": {
"cluster-key-slot": "1.1.2",
"generic-pool": "3.9.0"
},
"engines": {
"node": ">=16"
}
},
"node_modules/cluster-key-slot": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz",
"integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==",
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/generic-pool": {
"version": "3.9.0",
"resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz",
"integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==",
"engines": {
"node": ">= 4"
}
}
}
}

View File

@@ -1,15 +0,0 @@
{
"name": "test",
"version": "1.0.0",
"description": "",
"main": "index.js",
"type": "module",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"@redis/client": "next"
}
}

View File

@@ -1,21 +0,0 @@
import { RESP_TYPES, createClient } from '@redis/client';
const client = createClient({
RESP: 3,
commandOptions: {
typeMapping: {
[RESP_TYPES.MAP]: Map
}
}
});
client.on('error', err => console.error(err));
await client.connect();
console.log(
await client.flushAll(),
await client.hSet('key', 'field', 'value'),
await client.hGetAll('key')
)
client.destroy();