diff --git a/docs/programmability.md b/docs/programmability.md index 803ac689a6..f6ae42033c 100644 --- a/docs/programmability.md +++ b/docs/programmability.md @@ -19,7 +19,7 @@ redis.register_function { Here is the same example, but in a format that can be pasted into the `redis-cli`. ``` -FUNCTION LOAD "#!lua name=library\nredis.register_function{function_name=\"add\", callback=function(keys, args) return redis.call('GET', keys[1])+args[1] end, flags={\"no-writes\"}}" +FUNCTION LOAD "#!lua name=library\nredis.register_function{function_name='add', callback=function(keys, args) return redis.call('GET', keys[1])+args[1] end, flags={'no-writes'}}" ``` Load the prior redis function on the _redis server_ before running the example below. diff --git a/packages/client/index.ts b/packages/client/index.ts index 09874984de..06392e970e 100644 --- a/packages/client/index.ts +++ b/packages/client/index.ts @@ -4,11 +4,15 @@ export { VerbatimString } from './lib/RESP/verbatim-string'; export { defineScript } from './lib/lua-script'; // export * from './lib/errors'; -import RedisClient, { RedisClientType, RedisClientOptions } from './lib/client'; -export { RedisClientType, RedisClientOptions }; +import RedisClient, { RedisClientOptions, RedisClientType } from './lib/client'; +export { RedisClientOptions, RedisClientType }; export const createClient = RedisClient.create; -import RedisCluster, { RedisClusterType, RedisClusterOptions } from './lib/cluster'; +import { RedisClientPool, RedisPoolOptions, RedisClientPoolType } from './lib/client/pool'; +export { RedisClientPoolType, RedisPoolOptions }; +export const createClientPool = RedisClientPool.create; + +import RedisCluster, { RedisClusterOptions, RedisClusterType } from './lib/cluster'; export { RedisClusterType, RedisClusterOptions }; export const createCluster = RedisCluster.create; diff --git a/packages/client/lib/RESP/decoder.ts b/packages/client/lib/RESP/decoder.ts index c4e1296fc2..2fbadd0f5e 100644 --- a/packages/client/lib/RESP/decoder.ts +++ b/packages/client/lib/RESP/decoder.ts @@ -111,7 +111,10 @@ export class Decoder { case RESP_TYPES.NUMBER: return this._handleDecodedValue( this._config.onReply, - this._decodeNumber(chunk) + this._decodeNumber( + this._config.getTypeMapping()[RESP_TYPES.NUMBER], + chunk + ) ); case RESP_TYPES.BIG_NUMBER: @@ -226,7 +229,11 @@ export class Decoder { return boolean; } - private _decodeNumber(chunk) { + private _decodeNumber(type, chunk) { + if (type === String) { + return this._decodeSimpleString(String, chunk); + } + switch (chunk[this._cursor]) { case ASCII['+']: return this._maybeDecodeNumberValue(false, chunk); @@ -675,7 +682,7 @@ export class Decoder { return this._decodeBoolean(chunk); case RESP_TYPES.NUMBER: - return this._decodeNumber(chunk); + return this._decodeNumber(typeMapping[RESP_TYPES.NUMBER], chunk); case RESP_TYPES.BIG_NUMBER: return this._decodeBigNumber(typeMapping[RESP_TYPES.BIG_NUMBER], chunk); diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 8caa2f178a..1adc1a7692 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -11,7 +11,7 @@ export interface CommandOptions { asap?: boolean; abortSignal?: AbortSignal; /** - * Maps bettween RESP and JavaScript types + * Maps between RESP and JavaScript types */ typeMapping?: T; } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index dbf8b85b03..2ba12ac0f3 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -7,13 +7,13 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchErr import { URL } from 'url'; import { TcpSocketConnectOpts } from 'net'; import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub'; -import { Command, CommandArguments, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; +import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; import { RedisMultiQueuedCommand } from '../multi-command'; import HELLO, { HelloOptions } from '../commands/HELLO'; import { ScanOptions, ScanCommonOptions } from '../commands/SCAN'; import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; -// import { RedisClientPool } from './pool'; +import { RedisPoolOptions, RedisClientPool } from './pool'; interface ClientCommander< M extends RedisModules, @@ -21,7 +21,7 @@ interface ClientCommander< S extends RedisScripts, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping -> extends CommanderConfig{ +> extends CommanderConfig { commandOptions?: CommandOptions; } @@ -72,7 +72,7 @@ export interface RedisClientOptions< readonly?: boolean; /** * Send `PING` command at interval (in ms). - * Useful with Redis deployments that do not use TCP Keep-Alive. + * Useful with Redis deployments that do not honor TCP Keep-Alive. */ pingInterval?: number; } @@ -194,13 +194,7 @@ export default class RedisClient< return async function (this: ProxyClient, ...args: Array) { const scriptArgs = script.transformArguments(...args), redisArgs = prefix.concat(scriptArgs), - reply = await this.sendCommand(redisArgs, this._commandOptions).catch((err: unknown) => { - if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err; - - redisArgs[0] = 'EVAL'; - redisArgs[1] = script.SCRIPT; - return this.sendCommand(redisArgs, this._commandOptions); - }); + reply = await this.executeScript(script, redisArgs, this._commandOptions); return transformReply ? transformReply(reply, scriptArgs.preserve) : reply; @@ -218,8 +212,8 @@ export default class RedisClient< BaseClass: RedisClient, commands: COMMANDS, createCommand: RedisClient._createCommand, - createFunctionCommand: RedisClient._createFunctionCommand, createModuleCommand: RedisClient._createModuleCommand, + createFunctionCommand: RedisClient._createFunctionCommand, createScriptCommand: RedisClient._createScriptCommand, config }); @@ -227,8 +221,7 @@ export default class RedisClient< Client.prototype.Multi = RedisClientMultiCommand.extend(config); return (options?: Omit>) => { - // returning a proxy of the client to prevent the namespaces.self to leak between proxies - // namespaces will be bootstraped on first access per proxy + // returning a "proxy" to prevent the namespaces.self to leak between "proxies" return Object.create(new Client(options)) as RedisClientType; }; } @@ -527,13 +520,14 @@ export default class RedisClient< } /** - * Create `RedisClientPool` using this client as a prototype + * Create {@link RedisClientPool `RedisClientPool`} using this client as a prototype */ - // pool() { - // return RedisClientPool.fromClient( - // this as unknown as RedisClientType - // ); - // } + pool(options?: Partial) { + return RedisClientPool.create( + this._options, + options + ); + } duplicate< _M extends RedisModules = M, @@ -549,12 +543,13 @@ export default class RedisClient< }) as RedisClientType<_M, _F, _S, _RESP, _TYPE_MAPPING>; } - connect() { - return this._socket.connect(); + async connect() { + await this._socket.connect(); + return this as unknown as RedisClientType; } sendCommand( - args: CommandArguments, + args: Array, options?: CommandOptions ): Promise { if (!this._socket.isOpen) { @@ -568,6 +563,22 @@ export default class RedisClient< return promise; } + async executeScript( + script: RedisScript, + args: Array, + options?: CommandOptions + ) { + try { + return await this.sendCommand(args, options); + } catch (err) { + if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err; + + args[0] = 'EVAL'; + args[1] = script.SCRIPT; + return await this.sendCommand(args, options); + } + } + async SELECT(db: number): Promise { await this.sendCommand(['SELECT', db.toString()]); this._selectedDB = db; @@ -728,7 +739,7 @@ export default class RedisClient< /** * @internal */ - executePipeline(commands: Array) { + _executePipeline(commands: Array) { if (!this._socket.isOpen) { return Promise.reject(new ClientClosedError()); } @@ -745,7 +756,7 @@ export default class RedisClient< /** * @internal */ - async executeMulti( + async _executeMulti( commands: Array, selectedDB?: number ) { diff --git a/packages/client/lib/client/legacy-mode.ts b/packages/client/lib/client/legacy-mode.ts index 576d73be96..532c23fafb 100644 --- a/packages/client/lib/client/legacy-mode.ts +++ b/packages/client/lib/client/legacy-mode.ts @@ -160,7 +160,7 @@ class LegacyMultiCommand { } exec(cb?: (err: ErrorReply | null, replies?: Array) => unknown) { - const promise = this._client.executeMulti(this._multi.queue); + const promise = this._client._executeMulti(this._multi.queue); if (!cb) { promise.catch(err => this._client.emit('error', err)); diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index 1a68e1c2f3..c2f4952738 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -138,9 +138,26 @@ export class SinglyLinkedList { }; if (this._head === undefined) { - this._head = this._tail = node; + return this._head = this._tail = node; + } + + return this._tail!.next = this._tail = node; + } + + remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined) { + --this._length; + + if (this._head === node) { + if (this._tail === node) { + this._head = this._tail = undefined; + } else { + this._head = node.next; + } + } else if (this._tail === node) { + this._tail = parent; + parent!.next = undefined; } else { - this._tail!.next = this._tail = node; + parent!.next = node.next; } } diff --git a/packages/client/lib/client/multi-command.ts b/packages/client/lib/client/multi-command.ts index cacb5429a5..267444ad31 100644 --- a/packages/client/lib/client/multi-command.ts +++ b/packages/client/lib/client/multi-command.ts @@ -173,7 +173,7 @@ export default class RedisClientMultiCommand { if (execAsPipeline) return this.execAsPipeline(); return this._multi.transformReplies( - await this._client.executeMulti(this._multi.queue, this._selectedDB) + await this._client._executeMulti(this._multi.queue, this._selectedDB) ) as MultiReplyType; } @@ -187,7 +187,7 @@ export default class RedisClientMultiCommand { if (this._multi.queue.length === 0) return [] as MultiReplyType; return this._multi.transformReplies( - await this._client.executePipeline(this._multi.queue) + await this._client._executePipeline(this._multi.queue) ) as MultiReplyType; } diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index 5f1a286cc2..4cf7fc43de 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -1,242 +1,469 @@ -// import COMMANDS from '../commands'; -// import { RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types'; -// import RedisClient, { RedisClientType, RedisClientOptions, RedisClientExtensions } from '.'; -// import { EventEmitter } from 'events'; -// import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-list'; +import COMMANDS from '../commands'; +import { Command, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, RespVersions, TypeMapping } from '../RESP/types'; +import RedisClient, { RedisClientType, RedisClientOptions, RedisClientExtensions } from '.'; +import { EventEmitter } from 'events'; +import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-list'; +import { TimeoutError } from '../errors'; +import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; +import { CommandOptions } from './commands-queue'; -// export type RedisPoolOptions = typeof RedisClientPool['_DEFAULTS']; +export interface RedisPoolOptions { + /** + * The minimum number of clients to keep in the pool (>= 1). + */ + minimum: number; + /** + * The maximum number of clients to keep in the pool (>= {@link RedisPoolOptions.minimum} >= 1). + */ + maximum: number; + /** + * The maximum time a task can wait for a client to become available (>= 0). + */ + acquireTimeout: number; + /** + * TODO + */ + cleanupDelay: number; +} -// export type PoolTask< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts, -// RESP extends RespVersions, -// TYPE_MAPPING extends TypeMapping, -// T = unknown -// > = (client: RedisClientType) => T; +export type PoolTask< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping, + T = unknown +> = (client: RedisClientType) => T; -// export type RedisClientPoolType< -// M extends RedisModules = {}, -// F extends RedisFunctions = {}, -// S extends RedisScripts = {}, -// RESP extends RespVersions = 2, -// TYPE_MAPPING extends TypeMapping = {} -// > = ( -// RedisClientPool & -// RedisClientExtensions -// ); +export type RedisClientPoolType< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} +> = ( + RedisClientPool & + RedisClientExtensions +); -// export class RedisClientPool< -// M extends RedisModules = {}, -// F extends RedisFunctions = {}, -// S extends RedisScripts = {}, -// RESP extends RespVersions = 2, -// TYPE_MAPPING extends TypeMapping = {} -// > extends EventEmitter { -// static fromClient< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts, -// RESP extends RespVersions, -// TYPE_MAPPING extends TypeMapping = {} -// >( -// client: RedisClientType, -// poolOptions: Partial -// ) { -// return RedisClientPool.create( -// () => client.duplicate(), -// poolOptions -// ); -// } +type ProxyPool = RedisClientPoolType; -// static fromOptions< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts, -// RESP extends RespVersions, -// TYPE_MAPPING extends TypeMapping = {} -// >( -// options: RedisClientOptions, -// poolOptions: Partial -// ) { -// return RedisClientPool.create( -// RedisClient.factory(options), -// poolOptions -// ); -// } +type NamespaceProxyPool = { self: ProxyPool }; -// static create< -// M extends RedisModules, -// F extends RedisFunctions, -// S extends RedisScripts, -// RESP extends RespVersions, -// TYPE_MAPPING extends TypeMapping = {} -// >( -// clientFactory: () => RedisClientType, -// options?: Partial -// ) { -// return new RedisClientPool( -// clientFactory, -// options -// ) as RedisClientPoolType; -// } +export class RedisClientPool< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} +> extends EventEmitter { + private static _createCommand(command: Command, resp: RespVersions) { + const transformReply = getTransformReply(command, resp); + return async function (this: ProxyPool, ...args: Array) { + const redisArgs = command.transformArguments(...args), + reply = await this.sendCommand(redisArgs, this._commandOptions); + return transformReply ? + transformReply(reply, redisArgs.preserve) : + reply; + }; + } -// private static _DEFAULTS = { -// /** -// * The minimum number of clients to keep in the pool. -// */ -// minimum: 0, -// /** -// * The maximum number of clients to keep in the pool. -// */ -// maximum: 1, -// /** -// * The maximum time a task can wait for a client to become available. -// */ -// acquireTimeout: 3000, -// /** -// * When there are `> minimum && < maximum` clients in the pool, the pool will wait for `cleanupDelay` milliseconds before closing the extra clients. -// */ -// cleanupDelay: 3000 -// }; + private static _createModuleCommand(command: Command, resp: RespVersions) { + const transformReply = getTransformReply(command, resp); + return async function (this: NamespaceProxyPool, ...args: Array) { + const redisArgs = command.transformArguments(...args), + reply = await this.self.sendCommand(redisArgs, this.self._commandOptions); + return transformReply ? + transformReply(reply, redisArgs.preserve) : + reply; + }; + } -// private readonly _clientFactory: () => RedisClientType; -// private readonly _options: Required; -// private readonly _idleClients = new SinglyLinkedList>(); -// private readonly _usedClients = new DoublyLinkedList>(); -// private readonly _tasksQueue = new SinglyLinkedList<{ -// resolve: (value: T | PromiseLike) => void; -// reject: (reason?: unknown) => void; -// fn: PoolTask; -// }>(); + private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { + const prefix = functionArgumentsPrefix(name, fn), + transformReply = getTransformReply(fn, resp); + return async function (this: NamespaceProxyPool, ...args: Array) { + const fnArgs = fn.transformArguments(...args), + reply = await this.self.sendCommand( + prefix.concat(fnArgs), + this.self._commandOptions + ); + return transformReply ? + transformReply(reply, fnArgs.preserve) : + reply; + }; + } -// constructor( -// clientFactory: () => RedisClientType, -// options?: Partial -// ) { -// super(); + private static _createScriptCommand(script: RedisScript, resp: RespVersions) { + const prefix = scriptArgumentsPrefix(script), + transformReply = getTransformReply(script, resp); + return async function (this: ProxyPool, ...args: Array) { + const scriptArgs = script.transformArguments(...args), + redisArgs = prefix.concat(scriptArgs), + reply = await this.executeScript(script, redisArgs, this._commandOptions); + return transformReply ? + transformReply(reply, scriptArgs.preserve) : + reply; + }; + } -// this._clientFactory = clientFactory; -// this._options = { -// ...RedisClientPool._DEFAULTS, -// ...options -// }; -// this._initate(); -// } + static create< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping = {} + >( + // clientFactory: () => RedisClientType, + clientOptions?: RedisClientOptions, + options?: Partial + ) { + // @ts-ignore + const Pool = attachConfig({ + BaseClass: RedisClientPool, + commands: COMMANDS, + createCommand: RedisClientPool._createCommand, + createModuleCommand: RedisClientPool._createModuleCommand, + createFunctionCommand: RedisClientPool._createFunctionCommand, + createScriptCommand: RedisClientPool._createScriptCommand, + config: clientOptions + }); -// private async _initate() { -// const promises = []; -// while (promises.length < this._options.minimum) { -// promises.push(this._create()); -// } + // returning a "proxy" to prevent the namespaces.self to leak between "proxies" + return Object.create( + new Pool( + RedisClient.factory(clientOptions).bind(undefined, clientOptions), + options + ) + ) as RedisClientPoolType; + } -// try { -// await Promise.all(promises); -// } catch (err) { -// this.destroy(); -// this.emit('error', err); -// } -// } + // TODO: defaults + private static _DEFAULTS = { + minimum: 1, + maximum: 100, + acquireTimeout: 3000, + cleanupDelay: 3000 + } satisfies RedisPoolOptions; -// private async _create() { -// const client = this._clientFactory() -// // TODO: more events? -// .on('error', (err: Error) => this.emit('error', err)); + private readonly _clientFactory: () => RedisClientType; + private readonly _options: RedisPoolOptions; -// const node = this._usedClients.push(client); + private readonly _idleClients = new SinglyLinkedList>(); -// await client.connect(); + /** + * The number of idle clients. + */ + get idleClients() { + return this._idleClients.length; + } -// this._usedClients.remove(node); + private readonly _clientsInUse = new DoublyLinkedList>(); -// return client; -// } + /** + * The number of clients in use. + */ + get clientsInUse() { + return this._clientsInUse.length; + } -// execute(fn: PoolTask): Promise { -// return new Promise((resolve, reject) => { -// let client = this._idleClients.shift(); -// if (!client) { -// this._tasksQueue.push({ -// // @ts-ignore -// resolve, -// reject, -// fn -// }); + private readonly _connectingClients = 0; -// if (this._idleClients.length + this._usedClients.length < this._options.maximum) { -// this._create(); -// } + /** + * The number of clients that are currently connecting. + */ + get connectingClients() { + return this._connectingClients; + } -// return; -// } + /** + * The total number of clients in the pool (including connecting, idle, and in use). + */ + get totalClients() { + return this._idleClients.length + this._clientsInUse.length; + } -// const node = this._usedClients.push(client); -// // @ts-ignore -// this._executeTask(node, resolve, reject, fn); -// }); -// } + private readonly _tasksQueue = new SinglyLinkedList<{ + timeout: NodeJS.Timeout | undefined; + resolve: (value: unknown) => unknown; + reject: (reason?: unknown) => unknown; + fn: PoolTask; + }>(); -// private _executeTask( -// node: DoublyLinkedNode>, -// resolve: (value: T | PromiseLike) => void, -// reject: (reason?: unknown) => void, -// fn: PoolTask -// ) { -// const result = fn(node.value); -// if (result instanceof Promise) { -// result.then(resolve, reject); -// result.finally(() => this._returnClient(node)) -// } else { -// resolve(result); -// this._returnClient(node); -// } -// } + /** + * The number of tasks waiting for a client to become available. + */ + get tasksQueueLength() { + return this._tasksQueue.length; + } -// private _returnClient(node: DoublyLinkedListNode>) { -// const task = this._tasksQueue.shift(); -// if (task) { -// this._executeTask(node, task.resolve, task.reject, task.fn); -// return; -// } + private _isOpen = false; -// if (this._idleClients.length >= this._options.minimum) { -// node.client.destroy(); -// return; -// } + /** + * Whether the pool is open (either connecting or connected). + */ + get isOpen() { + return this._isOpen; + } -// this._usedClients.remove(node); -// this._idleClients.push(node.client); -// } + private _isClosing = false; -// async close() { -// const promises = []; + /** + * Whether the pool is closing (*not* closed). + */ + get isClosing() { + return this._isClosing; + } -// for (const client of this._idleClients) { -// promises.push(client.close()); -// } + /** + * You are probably looking for {@link RedisClient.pool `RedisClient.pool`}, + * {@link RedisClientPool.fromClient `RedisClientPool.fromClient`}, + * or {@link RedisClientPool.fromOptions `RedisClientPool.fromOptions`}... + */ + constructor( + clientFactory: () => RedisClientType, + options?: Partial + ) { + super(); -// this._idleClients.reset(); + this._clientFactory = clientFactory; + this._options = { + ...RedisClientPool._DEFAULTS, + ...options + }; + } -// for (const client of this._usedClients) { -// promises.push(client.close()); -// } + private _self = this; + private _commandOptions?: CommandOptions; -// this._usedClients.reset(); + withCommandOptions< + OPTIONS extends CommandOptions, + TYPE_MAPPING extends TypeMapping + >(options: OPTIONS) { + const proxy = Object.create(this._self); + proxy._commandOptions = options; + return proxy as RedisClientPoolType< + M, + F, + S, + RESP, + TYPE_MAPPING extends TypeMapping ? TYPE_MAPPING : {} + >; + } -// await Promise.all(promises); -// } + private _commandOptionsProxy< + K extends keyof CommandOptions, + V extends CommandOptions[K] + >( + key: K, + value: V + ) { + const proxy = Object.create(this._self); + proxy._commandOptions = Object.create(this._commandOptions ?? null); + proxy._commandOptions[key] = value; + return proxy as RedisClientPoolType< + M, + F, + S, + RESP, + K extends 'typeMapping' ? V extends TypeMapping ? V : {} : TYPE_MAPPING + >; + } -// destroy() { -// for (const client of this._idleClients) { -// client.destroy(); -// } + /** + * Override the `typeMapping` command option + */ + withTypeMapping(typeMapping: TYPE_MAPPING) { + return this._commandOptionsProxy('typeMapping', typeMapping); + } -// this._idleClients.reset(); + /** + * Override the `abortSignal` command option + */ + withAbortSignal(abortSignal: AbortSignal) { + return this._commandOptionsProxy('abortSignal', abortSignal); + } -// for (const client of this._usedClients) { -// client.destroy(); -// } + /** + * Override the `asap` command option to `true` + * TODO: remove? + */ + asap() { + return this._commandOptionsProxy('asap', true); + } -// this._usedClients.reset(); -// } -// } + async connect() { + if (this._isOpen) return; // TODO: throw error? + + this._isOpen = true; + + const promises = []; + while (promises.length < this._options.minimum) { + promises.push(this._create()); + } + + try { + await Promise.all(promises); + return this as unknown as RedisClientPoolType; + } catch (err) { + this.destroy(); + throw err; + } + } + + private async _create() { + const node = this._clientsInUse.push( + this._clientFactory() + .on('error', (err: Error) => this.emit('error', err)) + ); + + try { + await node.value.connect(); + } catch (err) { + this._clientsInUse.remove(node); + throw err; + } + + this._returnClient(node); + } + + execute(fn: PoolTask): Promise { + return new Promise((resolve, reject) => { + const client = this._idleClients.shift(), + { tail } = this._tasksQueue; + if (!client) { + let timeout; + if (this._options.acquireTimeout > 0) { + timeout = setTimeout( + () => { + this._tasksQueue.remove(task, tail); + reject(new TimeoutError('Timeout waiting for a client')); // TODO: message + }, + this._options.acquireTimeout + ); + } + + const task = this._tasksQueue.push({ + timeout, + // @ts-ignore + resolve, + reject, + fn + }); + + if (this.totalClients < this._options.maximum) { + this._create(); + } + + return; + } + + const node = this._clientsInUse.push(client); + // @ts-ignore + this._executeTask(node, resolve, reject, fn); + }); + } + + private _executeTask( + node: DoublyLinkedNode>, + resolve: (value: T | PromiseLike) => void, + reject: (reason?: unknown) => void, + fn: PoolTask + ) { + const result = fn(node.value); + if (result instanceof Promise) { + result.then(resolve, reject); + result.finally(() => this._returnClient(node)) + } else { + resolve(result); + this._returnClient(node); + } + } + + private _returnClient(node: DoublyLinkedNode>) { + const task = this._tasksQueue.shift(); + if (task) { + this._executeTask(node, task.resolve, task.reject, task.fn); + return; + } + + this._clientsInUse.remove(node); + this._idleClients.push(node.value); + + this._scheduleCleanup(); + } + + cleanupTimeout?: NodeJS.Timeout; + + private _scheduleCleanup() { + if (this.totalClients <= this._options.minimum) return; + + clearTimeout(this.cleanupTimeout); + this.cleanupTimeout = setTimeout(() => this._cleanup(), this._options.cleanupDelay); + } + + private _cleanup() { + const toDestroy = Math.min(this._idleClients.length, this.totalClients - this._options.minimum); + for (let i = 0; i < toDestroy; i++) { + // TODO: shift vs pop + this._idleClients.shift()!.destroy(); + } + } + + sendCommand( + args: Array, + options?: CommandOptions + ) { + return this.execute(client => client.sendCommand(args, options)); + } + + executeScript( + script: RedisScript, + args: Array, + options?: CommandOptions + ) { + return this.execute(client => client.executeScript(script, args, options)); + } + + async close() { + if (this._isClosing) return; // TODO: throw err? + if (!this._isOpen) return; // TODO: throw err? + + this._isClosing = true; + + try { + const promises = []; + + for (const client of this._idleClients) { + promises.push(client.close()); + } + + for (const client of this._clientsInUse) { + promises.push(client.close()); + } + + await Promise.all(promises); + + this._idleClients.reset(); + this._clientsInUse.reset(); + } catch (err) { + + } finally { + this._isClosing = false; + } + } + + destroy() { + for (const client of this._idleClients) { + client.destroy(); + } + this._idleClients.reset(); + + for (const client of this._clientsInUse) { + client.destroy(); + } + this._clientsInUse.reset(); + + this._isOpen = false; + } +} diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index e0386e74c5..718e119872 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -1,4 +1,4 @@ -import { RedisClientOptions } from '../client'; +import { RedisClientOptions, RedisClientType } from '../client'; import { CommandOptions } from '../client/commands-queue'; import { Command, CommandArguments, CommanderConfig, CommandPolicies, CommandWithPoliciesSignature, TypeMapping, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions } from '../RESP/types'; import COMMANDS from '../commands'; @@ -224,8 +224,8 @@ export default class RedisCluster< BaseClass: RedisCluster, commands: COMMANDS, createCommand: RedisCluster._createCommand, - createFunctionCommand: RedisCluster._createFunctionCommand, createModuleCommand: RedisCluster._createModuleCommand, + createFunctionCommand: RedisCluster._createFunctionCommand, createScriptCommand: RedisCluster._createScriptCommand, config }); @@ -233,8 +233,7 @@ export default class RedisCluster< Cluster.prototype.Multi = RedisClusterMultiCommand.extend(config); return (options?: Omit>) => { - // returning a proxy of the client to prevent the namespaces.self to leak between proxies - // namespaces will be bootstraped on first access per proxy + // returning a "proxy" to prevent the namespaces.self to leak between "proxies" return Object.create(new Cluster(options)) as RedisClusterType; }; } @@ -388,21 +387,17 @@ export default class RedisCluster< return this._commandOptionsProxy('policies', policies); } - async sendCommand( + async #execute( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, - args: CommandArguments, - options?: ClusterCommandOptions, - deafultPolicies?: CommandPolicies + fn: (client: RedisClientType) => Promise ): Promise { - // const requestPolicy = options?.policies?.request ?? deafultPolicies?.request, - // responsePolicy = options?.policies?.response ?? deafultPolicies?.response; - const maxCommandRedirections = this._options.maxCommandRedirections ?? 16; - let client = await this._slots.getClient(firstKey, isReadonly); - for (let i = 0; ; i++) { + let client = await this._slots.getClient(firstKey, isReadonly), + i = 0; + while (true) { try { - return await client.sendCommand(args, options); + return await fn(client); } catch (err) { // TODO: error class if (++i > maxCommandRedirections || !(err instanceof Error)) { @@ -424,39 +419,69 @@ export default class RedisCluster< await redirectTo.asking(); client = redirectTo; continue; - } else if (err.message.startsWith('MOVED')) { + } + + if (err.message.startsWith('MOVED')) { await this._slots.rediscover(client); client = await this._slots.getClient(firstKey, isReadonly); continue; } throw err; - } + } } } - /** - * @internal - */ - async executePipeline( + async sendCommand( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, - commands: Array + args: CommandArguments, + options?: ClusterCommandOptions, + defaultPolicies?: CommandPolicies + ): Promise { + return this.#execute( + firstKey, + isReadonly, + client => client.sendCommand(args, options) + ); + } + + executeScript( + script: RedisScript, + firstKey: RedisArgument | undefined, + isReadonly: boolean | undefined, + args: Array, + options?: CommandOptions ) { - const client = await this._slots.getClient(firstKey, isReadonly); - return client.executePipeline(commands); + return this.#execute( + firstKey, + isReadonly, + client => client.executeScript(script, args, options) + ); } /** * @internal */ - async executeMulti( + async _executePipeline( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, commands: Array ) { const client = await this._slots.getClient(firstKey, isReadonly); - return client.executeMulti(commands); + return client._executePipeline(commands); + } + + /** + * @internal + */ + async _executeMulti( + firstKey: RedisArgument | undefined, + isReadonly: boolean | undefined, + commands: Array + ) { + const client = await this._slots.getClient(firstKey, isReadonly); + return client._executeMulti(commands); } MULTI(routing?: RedisArgument): RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> { diff --git a/packages/client/lib/cluster/multi-command.ts b/packages/client/lib/cluster/multi-command.ts index dc351d42fc..a0292356da 100644 --- a/packages/client/lib/cluster/multi-command.ts +++ b/packages/client/lib/cluster/multi-command.ts @@ -213,7 +213,7 @@ export default class RedisClusterMultiCommand { if (execAsPipeline) return this.execAsPipeline(); return this._multi.transformReplies( - await this._cluster.executeMulti( + await this._cluster._executeMulti( this._firstKey, this._isReadonly, this._multi.queue @@ -231,7 +231,7 @@ export default class RedisClusterMultiCommand { if (this._multi.queue.length === 0) return [] as MultiReplyType; return this._multi.transformReplies( - await this._cluster.executePipeline( + await this._cluster._executePipeline( this._firstKey, this._isReadonly, this._multi.queue diff --git a/packages/client/lib/errors.ts b/packages/client/lib/errors.ts index c0815ebde8..33a2129b2b 100644 --- a/packages/client/lib/errors.ts +++ b/packages/client/lib/errors.ts @@ -67,3 +67,5 @@ export class ErrorReply extends Error { export class SimpleError extends ErrorReply {} export class BlobError extends ErrorReply {} + +export class TimeoutError extends Error {} diff --git a/test/package-lock.json b/test/package-lock.json deleted file mode 100644 index 878a92a6ad..0000000000 --- a/test/package-lock.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "name": "test", - "version": "1.0.0", - "lockfileVersion": 3, - "requires": true, - "packages": { - "": { - "name": "test", - "version": "1.0.0", - "license": "ISC", - "dependencies": { - "@redis/client": "next" - } - }, - "node_modules/@redis/client": { - "version": "2.0.0-next.2", - "resolved": "https://registry.npmjs.org/@redis/client/-/client-2.0.0-next.2.tgz", - "integrity": "sha512-+sf9n+PBHac2xXSofSX0x79cYa5H4ighu80F993q4H1T109ZthFNGBmg33DfwfPrDMKc256qTXvsb0lCqzwMmg==", - "dependencies": { - "cluster-key-slot": "1.1.2", - "generic-pool": "3.9.0" - }, - "engines": { - "node": ">=16" - } - }, - "node_modules/cluster-key-slot": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", - "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", - "engines": { - "node": ">=0.10.0" - } - }, - "node_modules/generic-pool": { - "version": "3.9.0", - "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", - "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==", - "engines": { - "node": ">= 4" - } - } - } -} diff --git a/test/package.json b/test/package.json deleted file mode 100644 index 23ca20e058..0000000000 --- a/test/package.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "name": "test", - "version": "1.0.0", - "description": "", - "main": "index.js", - "type": "module", - "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" - }, - "author": "", - "license": "ISC", - "dependencies": { - "@redis/client": "next" - } -} diff --git a/test/test.js b/test/test.js deleted file mode 100644 index f9b83cae7a..0000000000 --- a/test/test.js +++ /dev/null @@ -1,21 +0,0 @@ -import { RESP_TYPES, createClient } from '@redis/client'; - -const client = createClient({ - RESP: 3, - commandOptions: { - typeMapping: { - [RESP_TYPES.MAP]: Map - } - } -}); -client.on('error', err => console.error(err)); - -await client.connect(); - -console.log( - await client.flushAll(), - await client.hSet('key', 'field', 'value'), - await client.hGetAll('key') -) - -client.destroy();