1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00

move from TypeScript privates to "#"

This commit is contained in:
Leibale
2023-12-18 17:03:21 -05:00
parent d1f50df192
commit 6686f44d3b
11 changed files with 671 additions and 678 deletions

View File

@@ -44,18 +44,18 @@ const RESP2_PUSH_TYPE_MAPPING = {
}; };
export default class RedisCommandsQueue { export default class RedisCommandsQueue {
private readonly _maxLength: number | null | undefined; readonly #maxLength: number | null | undefined;
private readonly _toWrite = new DoublyLinkedList<CommandToWrite>(); readonly #toWrite = new DoublyLinkedList<CommandToWrite>();
private readonly _waitingForReply = new SinglyLinkedList<CommandWaitingForReply>(); readonly #waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
private readonly _onShardedChannelMoved: OnShardedChannelMoved; readonly #onShardedChannelMoved: OnShardedChannelMoved;
private readonly _pubSub = new PubSub(); readonly #pubSub = new PubSub();
get isPubSubActive() { get isPubSubActive() {
return this._pubSub.isActive; return this.#pubSub.isActive;
} }
private _chainInExecution: symbol | undefined; #chainInExecution: symbol | undefined;
decoder: Decoder; decoder: Decoder;
@@ -64,92 +64,92 @@ export default class RedisCommandsQueue {
maxLength: number | null | undefined, maxLength: number | null | undefined,
onShardedChannelMoved: EventEmitter['emit'] onShardedChannelMoved: EventEmitter['emit']
) { ) {
this.decoder = this._initiateDecoder(respVersion); this.decoder = this.#initiateDecoder(respVersion);
this._maxLength = maxLength; this.#maxLength = maxLength;
this._onShardedChannelMoved = onShardedChannelMoved; this.#onShardedChannelMoved = onShardedChannelMoved;
} }
private _initiateDecoder(respVersion: RespVersions | null | undefined) { #initiateDecoder(respVersion: RespVersions | null | undefined) {
return respVersion === 3 ? return respVersion === 3 ?
this._initiateResp3Decoder() : this.#initiateResp3Decoder() :
this._initiateResp2Decoder(); this.#initiateResp2Decoder();
} }
private _onReply(reply: ReplyUnion) { #onReply(reply: ReplyUnion) {
this._waitingForReply.shift()!.resolve(reply); this.#waitingForReply.shift()!.resolve(reply);
} }
private _onErrorReply(err: ErrorReply) { #onErrorReply(err: ErrorReply) {
this._waitingForReply.shift()!.reject(err); this.#waitingForReply.shift()!.reject(err);
} }
private _onPush(push: Array<any>) { #onPush(push: Array<any>) {
// TODO: type // TODO: type
if (this._pubSub.handleMessageReply(push)) return true; if (this.#pubSub.handleMessageReply(push)) return true;
const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push); const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push);
if (isShardedUnsubscribe && !this._waitingForReply.length) { if (isShardedUnsubscribe && !this.#waitingForReply.length) {
const channel = push[1].toString(); const channel = push[1].toString();
this._onShardedChannelMoved( this.#onShardedChannelMoved(
channel, channel,
this._pubSub.removeShardedListeners(channel) this.#pubSub.removeShardedListeners(channel)
); );
return true; return true;
} else if (isShardedUnsubscribe || PubSub.isStatusReply(push)) { } else if (isShardedUnsubscribe || PubSub.isStatusReply(push)) {
const head = this._waitingForReply.head!.value; const head = this.#waitingForReply.head!.value;
if ( if (
(Number.isNaN(head.channelsCounter!) && push[2] === 0) || (Number.isNaN(head.channelsCounter!) && push[2] === 0) ||
--head.channelsCounter! === 0 --head.channelsCounter! === 0
) { ) {
this._waitingForReply.shift()!.resolve(); this.#waitingForReply.shift()!.resolve();
} }
return true; return true;
} }
} }
private _getTypeMapping() { #getTypeMapping() {
return this._waitingForReply.head!.value.typeMapping ?? {}; return this.#waitingForReply.head!.value.typeMapping ?? {};
} }
private _initiateResp3Decoder() { #initiateResp3Decoder() {
return new Decoder({ return new Decoder({
onReply: reply => this._onReply(reply), onReply: reply => this.#onReply(reply),
onErrorReply: err => this._onErrorReply(err), onErrorReply: err => this.#onErrorReply(err),
onPush: push => { onPush: push => {
if (!this._onPush(push)) { if (!this.#onPush(push)) {
} }
}, },
getTypeMapping: () => this._getTypeMapping() getTypeMapping: () => this.#getTypeMapping()
}); });
} }
private _initiateResp2Decoder() { #initiateResp2Decoder() {
return new Decoder({ return new Decoder({
onReply: reply => { onReply: reply => {
if (this._pubSub.isActive && Array.isArray(reply)) { if (this.#pubSub.isActive && Array.isArray(reply)) {
if (this._onPush(reply)) return; if (this.#onPush(reply)) return;
if (PONG.equals(reply[0] as Buffer)) { if (PONG.equals(reply[0] as Buffer)) {
const { resolve, typeMapping } = this._waitingForReply.shift()!, const { resolve, typeMapping } = this.#waitingForReply.shift()!,
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer; buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString()); resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString());
return; return;
} }
} }
this._onReply(reply); this.#onReply(reply);
}, },
onErrorReply: err => this._onErrorReply(err), onErrorReply: err => this.#onErrorReply(err),
// PUSH type does not exist in RESP2 // PUSH type does not exist in RESP2
// PubSub is handled in onReply // PubSub is handled in onReply
// @ts-expect-error // @ts-expect-error
onPush: undefined, onPush: undefined,
getTypeMapping: () => { getTypeMapping: () => {
// PubSub push is an Array in RESP2 // PubSub push is an Array in RESP2
return this._pubSub.isActive ? return this.#pubSub.isActive ?
RESP2_PUSH_TYPE_MAPPING : RESP2_PUSH_TYPE_MAPPING :
this._getTypeMapping(); this.#getTypeMapping();
} }
}); });
} }
@@ -180,7 +180,7 @@ export default class RedisCommandsQueue {
options?: CommandOptions, options?: CommandOptions,
resolveOnWrite?: boolean resolveOnWrite?: boolean
): Promise<T> { ): Promise<T> {
if (this._maxLength && this._toWrite.length + this._waitingForReply.length >= this._maxLength) { if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) {
return Promise.reject(new Error('The queue is full')); return Promise.reject(new Error('The queue is full'));
} else if (options?.abortSignal?.aborted) { } else if (options?.abortSignal?.aborted) {
return Promise.reject(new AbortError()); return Promise.reject(new AbortError());
@@ -204,14 +204,14 @@ export default class RedisCommandsQueue {
value.abort = { value.abort = {
signal, signal,
listener: () => { listener: () => {
this._toWrite.remove(node); this.#toWrite.remove(node);
value.reject(new AbortError()); value.reject(new AbortError());
} }
}; };
signal.addEventListener('abort', value.abort.listener, { once: true }); signal.addEventListener('abort', value.abort.listener, { once: true });
} }
node = this._toWrite.add(value, options?.asap); node = this.#toWrite.add(value, options?.asap);
}); });
} }
@@ -221,8 +221,8 @@ export default class RedisCommandsQueue {
listener: PubSubListener<T>, listener: PubSubListener<T>,
returnBuffers?: T returnBuffers?: T
) { ) {
return this._addPubSubCommand( return this.#addPubSubCommand(
this._pubSub.subscribe(type, channels, listener, returnBuffers) this.#pubSub.subscribe(type, channels, listener, returnBuffers)
); );
} }
@@ -232,17 +232,17 @@ export default class RedisCommandsQueue {
listener?: PubSubListener<T>, listener?: PubSubListener<T>,
returnBuffers?: T returnBuffers?: T
) { ) {
return this._addPubSubCommand( return this.#addPubSubCommand(
this._pubSub.unsubscribe(type, channels, listener, returnBuffers) this.#pubSub.unsubscribe(type, channels, listener, returnBuffers)
); );
} }
resubscribe(): Promise<any> | undefined { resubscribe(): Promise<any> | undefined {
const commands = this._pubSub.resubscribe(); const commands = this.#pubSub.resubscribe();
if (!commands.length) return; if (!commands.length) return;
return Promise.all( return Promise.all(
commands.map(command => this._addPubSubCommand(command, true)) commands.map(command => this.#addPubSubCommand(command, true))
); );
} }
@@ -251,26 +251,26 @@ export default class RedisCommandsQueue {
channel: string, channel: string,
listeners: ChannelListeners listeners: ChannelListeners
) { ) {
return this._addPubSubCommand( return this.#addPubSubCommand(
this._pubSub.extendChannelListeners(type, channel, listeners) this.#pubSub.extendChannelListeners(type, channel, listeners)
); );
} }
extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) { extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) {
return this._addPubSubCommand( return this.#addPubSubCommand(
this._pubSub.extendTypeListeners(type, listeners) this.#pubSub.extendTypeListeners(type, listeners)
); );
} }
getPubSubListeners(type: PubSubType) { getPubSubListeners(type: PubSubType) {
return this._pubSub.getTypeListeners(type); return this.#pubSub.getTypeListeners(type);
} }
private _addPubSubCommand(command: PubSubCommand, asap = false) { #addPubSubCommand(command: PubSubCommand, asap = false) {
if (command === undefined) return; if (command === undefined) return;
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
this._toWrite.add({ this.#toWrite.add({
args: command.args, args: command.args,
chainId: undefined, chainId: undefined,
abort: undefined, abort: undefined,
@@ -290,23 +290,23 @@ export default class RedisCommandsQueue {
} }
isWaitingToWrite() { isWaitingToWrite() {
return this._toWrite.length > 0; return this.#toWrite.length > 0;
} }
*commandsToWrite() { *commandsToWrite() {
let toSend = this._toWrite.shift(); let toSend = this.#toWrite.shift();
while (toSend) { while (toSend) {
let encoded: CommandArguments; let encoded: CommandArguments;
try { try {
encoded = encodeCommand(toSend.args); encoded = encodeCommand(toSend.args);
} catch (err) { } catch (err) {
toSend.reject(err); toSend.reject(err);
toSend = this._toWrite.shift(); toSend = this.#toWrite.shift();
continue; continue;
} }
if (toSend.abort) { if (toSend.abort) {
RedisCommandsQueue._removeAbortListener(toSend); RedisCommandsQueue.#removeAbortListener(toSend);
toSend.abort = undefined; toSend.abort = undefined;
} }
@@ -316,31 +316,31 @@ export default class RedisCommandsQueue {
// TODO reuse `toSend` or create new object? // TODO reuse `toSend` or create new object?
(toSend as any).args = undefined; (toSend as any).args = undefined;
this._chainInExecution = toSend.chainId; this.#chainInExecution = toSend.chainId;
toSend.chainId = undefined; toSend.chainId = undefined;
this._waitingForReply.push(toSend); this.#waitingForReply.push(toSend);
} }
yield encoded; yield encoded;
toSend = this._toWrite.shift(); toSend = this.#toWrite.shift();
} }
} }
private _flushWaitingForReply(err: Error): void { #flushWaitingForReply(err: Error): void {
for (const node of this._waitingForReply) { for (const node of this.#waitingForReply) {
node.reject(err); node.reject(err);
} }
this._waitingForReply.reset(); this.#waitingForReply.reset();
} }
private static _removeAbortListener(command: CommandToWrite) { static #removeAbortListener(command: CommandToWrite) {
command.abort!.signal.removeEventListener('abort', command.abort!.listener); command.abort!.signal.removeEventListener('abort', command.abort!.listener);
} }
private static _flushToWrite(toBeSent: CommandToWrite, err: Error) { static #flushToWrite(toBeSent: CommandToWrite, err: Error) {
if (toBeSent.abort) { if (toBeSent.abort) {
RedisCommandsQueue._removeAbortListener(toBeSent); RedisCommandsQueue.#removeAbortListener(toBeSent);
} }
toBeSent.reject(err); toBeSent.reject(err);
@@ -348,36 +348,36 @@ export default class RedisCommandsQueue {
flushWaitingForReply(err: Error): void { flushWaitingForReply(err: Error): void {
this.decoder.reset(); this.decoder.reset();
this._pubSub.reset(); this.#pubSub.reset();
this._flushWaitingForReply(err); this.#flushWaitingForReply(err);
if (!this._chainInExecution) return; if (!this.#chainInExecution) return;
while (this._toWrite.head?.value.chainId === this._chainInExecution) { while (this.#toWrite.head?.value.chainId === this.#chainInExecution) {
RedisCommandsQueue._flushToWrite( RedisCommandsQueue.#flushToWrite(
this._toWrite.shift()!, this.#toWrite.shift()!,
err err
); );
} }
this._chainInExecution = undefined; this.#chainInExecution = undefined;
} }
flushAll(err: Error): void { flushAll(err: Error): void {
this.decoder.reset(); this.decoder.reset();
this._pubSub.reset(); this.#pubSub.reset();
this._flushWaitingForReply(err); this.#flushWaitingForReply(err);
for (const node of this._toWrite) { for (const node of this.#toWrite) {
RedisCommandsQueue._flushToWrite(node, err); RedisCommandsQueue.#flushToWrite(node, err);
} }
this._toWrite.reset(); this.#toWrite.reset();
} }
isEmpty() { isEmpty() {
return ( return (
this._toWrite.length === 0 && this.#toWrite.length === 0 &&
this._waitingForReply.length === 0 this.#waitingForReply.length === 0
); );
} }
} }

View File

@@ -132,7 +132,7 @@ export type RedisClientType<
type ProxyClient = RedisClient<any, any, any, any, any>; type ProxyClient = RedisClient<any, any, any, any, any>;
type NamespaceProxyClient = { self: ProxyClient }; type NamespaceProxyClient = { _self: ProxyClient };
interface ScanIteratorOptions { interface ScanIteratorOptions {
cursor?: RedisArgument; cursor?: RedisArgument;
@@ -147,7 +147,7 @@ export default class RedisClient<
RESP extends RespVersions, RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping TYPE_MAPPING extends TypeMapping
> extends EventEmitter { > extends EventEmitter {
private static _createCommand(command: Command, resp: RespVersions) { static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return async function (this: ProxyClient, ...args: Array<unknown>) { return async function (this: ProxyClient, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args), const redisArgs = command.transformArguments(...args),
@@ -158,25 +158,25 @@ export default class RedisClient<
}; };
} }
private static _createModuleCommand(command: Command, resp: RespVersions) { static #createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return async function (this: NamespaceProxyClient, ...args: Array<unknown>) { return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args), const redisArgs = command.transformArguments(...args),
reply = await this.self.sendCommand(redisArgs, this.self._commandOptions); reply = await this._self.sendCommand(redisArgs, this._self._commandOptions);
return transformReply ? return transformReply ?
transformReply(reply, redisArgs.preserve) : transformReply(reply, redisArgs.preserve) :
reply; reply;
}; };
} }
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn), const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp); transformReply = getTransformReply(fn, resp);
return async function (this: NamespaceProxyClient, ...args: Array<unknown>) { return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args), const fnArgs = fn.transformArguments(...args),
reply = await this.self.sendCommand( reply = await this._self.sendCommand(
prefix.concat(fnArgs), prefix.concat(fnArgs),
this.self._commandOptions this._self._commandOptions
); );
return transformReply ? return transformReply ?
transformReply(reply, fnArgs.preserve) : transformReply(reply, fnArgs.preserve) :
@@ -184,7 +184,7 @@ export default class RedisClient<
}; };
} }
private static _createScriptCommand(script: RedisScript, resp: RespVersions) { static #createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script), const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp); transformReply = getTransformReply(script, resp);
return async function (this: ProxyClient, ...args: Array<unknown>) { return async function (this: ProxyClient, ...args: Array<unknown>) {
@@ -206,10 +206,10 @@ export default class RedisClient<
const Client = attachConfig({ const Client = attachConfig({
BaseClass: RedisClient, BaseClass: RedisClient,
commands: COMMANDS, commands: COMMANDS,
createCommand: RedisClient._createCommand, createCommand: RedisClient.#createCommand,
createModuleCommand: RedisClient._createModuleCommand, createModuleCommand: RedisClient.#createModuleCommand,
createFunctionCommand: RedisClient._createFunctionCommand, createFunctionCommand: RedisClient.#createFunctionCommand,
createScriptCommand: RedisClient._createScriptCommand, createScriptCommand: RedisClient.#createScriptCommand,
config config
}); });
@@ -218,7 +218,7 @@ export default class RedisClient<
return <TYPE_MAPPING extends TypeMapping = {}>( return <TYPE_MAPPING extends TypeMapping = {}>(
options?: Omit<RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>, keyof Exclude<typeof config, undefined>> options?: Omit<RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>, keyof Exclude<typeof config, undefined>>
) => { ) => {
// returning a "proxy" to prevent the namespaces.self to leak between "proxies" // returning a "proxy" to prevent the namespaces._self to leak between "proxies"
return Object.create(new Client(options)) as RedisClientType<M, F, S, RESP, TYPE_MAPPING>; return Object.create(new Client(options)) as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
}; };
} }
@@ -272,39 +272,38 @@ export default class RedisClient<
return parsed; return parsed;
} }
self = this; readonly #options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>;
readonly #socket: RedisSocket;
private readonly _options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>; readonly #queue: RedisCommandsQueue;
private readonly _socket: RedisSocket; #selectedDB = 0;
private readonly _queue: RedisCommandsQueue; #monitorCallback?: MonitorCallback<TYPE_MAPPING>;
private _selectedDB = 0; private _self = this;
private _monitorCallback?: MonitorCallback<TYPE_MAPPING>;
private _commandOptions?: CommandOptions<TYPE_MAPPING>; private _commandOptions?: CommandOptions<TYPE_MAPPING>;
get options(): RedisClientOptions<M, F, S, RESP> | undefined { get options(): RedisClientOptions<M, F, S, RESP> | undefined {
return this._options; return this._self.#options;
} }
get isOpen(): boolean { get isOpen(): boolean {
return this._socket.isOpen; return this._self.#socket.isOpen;
} }
get isReady(): boolean { get isReady(): boolean {
return this._socket.isReady; return this._self.#socket.isReady;
} }
get isPubSubActive() { get isPubSubActive() {
return this._queue.isPubSubActive; return this._self.#queue.isPubSubActive;
} }
constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) { constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
super(); super();
this._options = this._initiateOptions(options); this.#options = this.#initiateOptions(options);
this._queue = this._initiateQueue(); this.#queue = this.#initiateQueue();
this._socket = this._initiateSocket(); this.#socket = this.#initiateSocket();
} }
private _initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined { #initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
if (options?.url) { if (options?.url) {
const parsed = RedisClient.parseURL(options.url); const parsed = RedisClient.parseURL(options.url);
if (options.socket) { if (options.socket) {
@@ -315,7 +314,7 @@ export default class RedisClient<
} }
if (options?.database) { if (options?.database) {
this.self._selectedDB = options.database; this._self.#selectedDB = options.database;
} }
if (options?.commandOptions) { if (options?.commandOptions) {
@@ -325,82 +324,82 @@ export default class RedisClient<
return options; return options;
} }
private _initiateQueue(): RedisCommandsQueue { #initiateQueue(): RedisCommandsQueue {
return new RedisCommandsQueue( return new RedisCommandsQueue(
this._options?.RESP, this.#options?.RESP,
this._options?.commandsQueueMaxLength, this.#options?.commandsQueueMaxLength,
(channel, listeners) => this.emit('sharded-channel-moved', channel, listeners) (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners)
); );
} }
private _initiateSocket(): RedisSocket { #initiateSocket(): RedisSocket {
const socketInitiator = async (): Promise<void> => { const socketInitiator = async (): Promise<void> => {
const promises = [this._queue.resubscribe()]; const promises = [this.#queue.resubscribe()];
if (this._monitorCallback) { if (this.#monitorCallback) {
promises.push( promises.push(
this._queue.monitor( this.#queue.monitor(
this._monitorCallback, this.#monitorCallback,
this._commandOptions?.typeMapping, this._commandOptions?.typeMapping,
true true
) )
); );
} }
if (this._selectedDB !== 0) { if (this.#selectedDB !== 0) {
promises.push( promises.push(
this._queue.addCommand( this.#queue.addCommand(
['SELECT', this._selectedDB.toString()], ['SELECT', this.#selectedDB.toString()],
{ asap: true } { asap: true }
) )
); );
} }
if (this._options?.readonly) { if (this.#options?.readonly) {
promises.push( promises.push(
this._queue.addCommand( this.#queue.addCommand(
COMMANDS.READONLY.transformArguments(), COMMANDS.READONLY.transformArguments(),
{ asap: true } { asap: true }
) )
); );
} }
if (this._options?.RESP) { if (this.#options?.RESP) {
const hello: HelloOptions = {}; const hello: HelloOptions = {};
if (this._options.password) { if (this.#options.password) {
hello.AUTH = { hello.AUTH = {
username: this._options.username ?? 'default', username: this.#options.username ?? 'default',
password: this._options.password password: this.#options.password
}; };
} }
if (this._options.name) { if (this.#options.name) {
hello.SETNAME = this._options.name; hello.SETNAME = this.#options.name;
} }
promises.push( promises.push(
this._queue.addCommand( this.#queue.addCommand(
HELLO.transformArguments(this._options.RESP, hello), HELLO.transformArguments(this.#options.RESP, hello),
{ asap: true } { asap: true }
) )
); );
} else { } else {
if (this._options?.name) { if (this.#options?.name) {
promises.push( promises.push(
this._queue.addCommand( this.#queue.addCommand(
COMMANDS.CLIENT_SETNAME.transformArguments(this._options.name), COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name),
{ asap: true } { asap: true }
) )
); );
} }
if (this._options?.username || this._options?.password) { if (this.#options?.username || this.#options?.password) {
promises.push( promises.push(
this._queue.addCommand( this.#queue.addCommand(
COMMANDS.AUTH.transformArguments({ COMMANDS.AUTH.transformArguments({
username: this._options.username, username: this.#options.username,
password: this._options.password ?? '' password: this.#options.password ?? ''
}), }),
{ asap: true } { asap: true }
) )
@@ -409,60 +408,60 @@ export default class RedisClient<
} }
if (promises.length) { if (promises.length) {
this._write(); this.#write();
await Promise.all(promises); await Promise.all(promises);
} }
}; };
return new RedisSocket(socketInitiator, this._options?.socket) return new RedisSocket(socketInitiator, this.#options?.socket)
.on('data', chunk => { .on('data', chunk => {
try { try {
this._queue.decoder.write(chunk); this.#queue.decoder.write(chunk);
} catch (err) { } catch (err) {
this._queue.decoder.reset(); this.#queue.decoder.reset();
this.emit('error', err); this.emit('error', err);
} }
}) })
.on('error', err => { .on('error', err => {
this.emit('error', err); this.emit('error', err);
if (this._socket.isOpen && !this._options?.disableOfflineQueue) { if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
this._queue.flushWaitingForReply(err); this.#queue.flushWaitingForReply(err);
} else { } else {
this._queue.flushAll(err); this.#queue.flushAll(err);
} }
}) })
.on('connect', () => this.emit('connect')) .on('connect', () => this.emit('connect'))
.on('ready', () => { .on('ready', () => {
this.emit('ready'); this.emit('ready');
this._setPingTimer(); this.#setPingTimer();
this._maybeScheduleWrite(); this.#maybeScheduleWrite();
}) })
.on('reconnecting', () => this.emit('reconnecting')) .on('reconnecting', () => this.emit('reconnecting'))
.on('drain', () => this._maybeScheduleWrite()) .on('drain', () => this.#maybeScheduleWrite())
.on('end', () => this.emit('end')); .on('end', () => this.emit('end'));
} }
private _pingTimer?: NodeJS.Timeout; #pingTimer?: NodeJS.Timeout;
private _setPingTimer(): void { #setPingTimer(): void {
if (!this._options?.pingInterval || !this._socket.isReady) return; if (!this.#options?.pingInterval || !this.#socket.isReady) return;
clearTimeout(this._pingTimer); clearTimeout(this.#pingTimer);
this._pingTimer = setTimeout(() => { this.#pingTimer = setTimeout(() => {
if (!this._socket.isReady) return; if (!this.#socket.isReady) return;
this.sendCommand(['PING']) this.sendCommand(['PING'])
.then(reply => this.emit('ping-interval', reply)) .then(reply => this.emit('ping-interval', reply))
.catch(err => this.emit('error', err)) .catch(err => this.emit('error', err))
.finally(() => this._setPingTimer()); .finally(() => this.#setPingTimer());
}, this._options.pingInterval); }, this.#options.pingInterval);
} }
withCommandOptions< withCommandOptions<
OPTIONS extends CommandOptions<TYPE_MAPPING>, OPTIONS extends CommandOptions<TYPE_MAPPING>,
TYPE_MAPPING extends TypeMapping TYPE_MAPPING extends TypeMapping
>(options: OPTIONS) { >(options: OPTIONS) {
const proxy = Object.create(this.self); const proxy = Object.create(this._self);
proxy._commandOptions = options; proxy._commandOptions = options;
return proxy as RedisClientType< return proxy as RedisClientType<
M, M,
@@ -480,7 +479,7 @@ export default class RedisClient<
key: K, key: K,
value: V value: V
) { ) {
const proxy = Object.create(this.self); const proxy = Object.create(this._self);
proxy._commandOptions = Object.create(this._commandOptions ?? null); proxy._commandOptions = Object.create(this._commandOptions ?? null);
proxy._commandOptions[key] = value; proxy._commandOptions[key] = value;
return proxy as RedisClientType< return proxy as RedisClientType<
@@ -527,7 +526,7 @@ export default class RedisClient<
*/ */
createPool(options?: Partial<RedisPoolOptions>) { createPool(options?: Partial<RedisPoolOptions>) {
return RedisClientPool.create( return RedisClientPool.create(
this._options, this._self.#options,
options options
); );
} }
@@ -540,14 +539,14 @@ export default class RedisClient<
_TYPE_MAPPING extends TypeMapping = TYPE_MAPPING _TYPE_MAPPING extends TypeMapping = TYPE_MAPPING
>(overrides?: Partial<RedisClientOptions<_M, _F, _S, _RESP, _TYPE_MAPPING>>) { >(overrides?: Partial<RedisClientOptions<_M, _F, _S, _RESP, _TYPE_MAPPING>>) {
return new (Object.getPrototypeOf(this).constructor)({ return new (Object.getPrototypeOf(this).constructor)({
...this._options, ...this._self.#options,
commandOptions: this._commandOptions, commandOptions: this._commandOptions,
...overrides ...overrides
}) as RedisClientType<_M, _F, _S, _RESP, _TYPE_MAPPING>; }) as RedisClientType<_M, _F, _S, _RESP, _TYPE_MAPPING>;
} }
async connect() { async connect() {
await this._socket.connect(); await this._self.#socket.connect();
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>; return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
} }
@@ -555,14 +554,14 @@ export default class RedisClient<
args: Array<RedisArgument>, args: Array<RedisArgument>,
options?: CommandOptions options?: CommandOptions
): Promise<T> { ): Promise<T> {
if (!this._socket.isOpen) { if (!this._self.#socket.isOpen) {
return Promise.reject(new ClientClosedError()); return Promise.reject(new ClientClosedError());
} else if (!this._socket.isReady && this._options?.disableOfflineQueue) { } else if (!this._self.#socket.isReady && this._self.#options?.disableOfflineQueue) {
return Promise.reject(new ClientOfflineError()); return Promise.reject(new ClientOfflineError());
} }
const promise = this._queue.addCommand<T>(args, options); const promise = this._self.#queue.addCommand<T>(args, options);
this._scheduleWrite(); this._self.#scheduleWrite();
return promise; return promise;
} }
@@ -584,15 +583,15 @@ export default class RedisClient<
async SELECT(db: number): Promise<void> { async SELECT(db: number): Promise<void> {
await this.sendCommand(['SELECT', db.toString()]); await this.sendCommand(['SELECT', db.toString()]);
this.self._selectedDB = db; this._self.#selectedDB = db;
} }
select = this.SELECT; select = this.SELECT;
private _pubSubCommand(promise: Promise<void> | undefined) { #pubSubCommand(promise: Promise<void> | undefined) {
if (promise === undefined) return Promise.resolve(); if (promise === undefined) return Promise.resolve();
this._scheduleWrite(); this.#scheduleWrite();
return promise; return promise;
} }
@@ -601,8 +600,8 @@ export default class RedisClient<
listener: PubSubListener<T>, listener: PubSubListener<T>,
bufferMode?: T bufferMode?: T
): Promise<void> { ): Promise<void> {
return this._pubSubCommand( return this._self.#pubSubCommand(
this._queue.subscribe( this._self.#queue.subscribe(
PubSubType.CHANNELS, PubSubType.CHANNELS,
channels, channels,
listener, listener,
@@ -618,8 +617,8 @@ export default class RedisClient<
listener?: PubSubListener<T>, listener?: PubSubListener<T>,
bufferMode?: T bufferMode?: T
): Promise<void> { ): Promise<void> {
return this._pubSubCommand( return this._self.#pubSubCommand(
this._queue.unsubscribe( this._self.#queue.unsubscribe(
PubSubType.CHANNELS, PubSubType.CHANNELS,
channels, channels,
listener, listener,
@@ -635,8 +634,8 @@ export default class RedisClient<
listener: PubSubListener<T>, listener: PubSubListener<T>,
bufferMode?: T bufferMode?: T
): Promise<void> { ): Promise<void> {
return this._pubSubCommand( return this._self.#pubSubCommand(
this._queue.subscribe( this._self.#queue.subscribe(
PubSubType.PATTERNS, PubSubType.PATTERNS,
patterns, patterns,
listener, listener,
@@ -652,8 +651,8 @@ export default class RedisClient<
listener?: PubSubListener<T>, listener?: PubSubListener<T>,
bufferMode?: T bufferMode?: T
): Promise<void> { ): Promise<void> {
return this._pubSubCommand( return this._self.#pubSubCommand(
this._queue.unsubscribe( this._self.#queue.unsubscribe(
PubSubType.PATTERNS, PubSubType.PATTERNS,
patterns, patterns,
listener, listener,
@@ -669,8 +668,8 @@ export default class RedisClient<
listener: PubSubListener<T>, listener: PubSubListener<T>,
bufferMode?: T bufferMode?: T
): Promise<void> { ): Promise<void> {
return this._pubSubCommand( return this._self.#pubSubCommand(
this._queue.subscribe( this._self.#queue.subscribe(
PubSubType.SHARDED, PubSubType.SHARDED,
channels, channels,
listener, listener,
@@ -686,8 +685,8 @@ export default class RedisClient<
listener?: PubSubListener<T>, listener?: PubSubListener<T>,
bufferMode?: T bufferMode?: T
): Promise<void> { ): Promise<void> {
return this._pubSubCommand( return this._self.#pubSubCommand(
this._queue.unsubscribe( this._self.#queue.unsubscribe(
PubSubType.SHARDED, PubSubType.SHARDED,
channels, channels,
listener, listener,
@@ -699,7 +698,7 @@ export default class RedisClient<
sUnsubscribe = this.SUNSUBSCRIBE; sUnsubscribe = this.SUNSUBSCRIBE;
getPubSubListeners(type: PubSubType) { getPubSubListeners(type: PubSubType) {
return this._queue.getPubSubListeners(type); return this._self.#queue.getPubSubListeners(type);
} }
extendPubSubChannelListeners( extendPubSubChannelListeners(
@@ -707,36 +706,36 @@ export default class RedisClient<
channel: string, channel: string,
listeners: ChannelListeners listeners: ChannelListeners
) { ) {
return this._pubSubCommand( return this._self.#pubSubCommand(
this._queue.extendPubSubChannelListeners(type, channel, listeners) this._self.#queue.extendPubSubChannelListeners(type, channel, listeners)
); );
} }
extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) { extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) {
return this._pubSubCommand( return this._self.#pubSubCommand(
this._queue.extendPubSubListeners(type, listeners) this._self.#queue.extendPubSubListeners(type, listeners)
); );
} }
private _write() { #write() {
this._socket.write(this._queue.commandsToWrite()); this.#socket.write(this.#queue.commandsToWrite());
} }
private _scheduledWrite?: NodeJS.Immediate; #scheduledWrite?: NodeJS.Immediate;
private _scheduleWrite() { #scheduleWrite() {
if (!this.isReady || this._scheduledWrite) return; if (!this.#socket.isReady || this.#scheduledWrite) return;
this._scheduledWrite = setImmediate(() => { this.#scheduledWrite = setImmediate(() => {
this._write(); this.#write();
this._scheduledWrite = undefined; this.#scheduledWrite = undefined;
}); });
} }
private _maybeScheduleWrite() { #maybeScheduleWrite() {
if (!this._queue.isWaitingToWrite()) return; if (!this.#queue.isWaitingToWrite()) return;
this._scheduleWrite(); this.#scheduleWrite();
} }
/** /**
@@ -746,20 +745,20 @@ export default class RedisClient<
commands: Array<RedisMultiQueuedCommand>, commands: Array<RedisMultiQueuedCommand>,
selectedDB?: number selectedDB?: number
) { ) {
if (!this._socket.isOpen) { if (!this._self.#socket.isOpen) {
return Promise.reject(new ClientClosedError()); return Promise.reject(new ClientClosedError());
} }
const promise = Promise.all( const promise = Promise.all(
commands.map(({ args }) => this._queue.addCommand(args, { commands.map(({ args }) => this._self.#queue.addCommand(args, {
typeMapping: this._commandOptions?.typeMapping typeMapping: this._commandOptions?.typeMapping
})) }))
); );
this._scheduleWrite(); this._self.#scheduleWrite();
const result = await promise; const result = await promise;
if (selectedDB !== undefined) { if (selectedDB !== undefined) {
this.self._selectedDB = selectedDB; this._self.#selectedDB = selectedDB;
} }
return result; return result;
@@ -772,19 +771,19 @@ export default class RedisClient<
commands: Array<RedisMultiQueuedCommand>, commands: Array<RedisMultiQueuedCommand>,
selectedDB?: number selectedDB?: number
) { ) {
if (!this._socket.isOpen) { if (!this._self.#socket.isOpen) {
throw new ClientClosedError(); throw new ClientClosedError();
} }
const typeMapping = this._commandOptions?.typeMapping, const typeMapping = this._commandOptions?.typeMapping,
chainId = Symbol('MULTI Chain'), chainId = Symbol('MULTI Chain'),
promises = [ promises = [
this._queue.addCommand(['MULTI'], { chainId }), this._self.#queue.addCommand(['MULTI'], { chainId }),
]; ];
for (const { args } of commands) { for (const { args } of commands) {
promises.push( promises.push(
this._queue.addCommand(args, { this._self.#queue.addCommand(args, {
chainId, chainId,
typeMapping typeMapping
}) })
@@ -792,10 +791,10 @@ export default class RedisClient<
} }
promises.push( promises.push(
this._queue.addCommand(['EXEC'], { chainId }) this._self.#queue.addCommand(['EXEC'], { chainId })
); );
this._scheduleWrite(); this._self.#scheduleWrite();
const results = await Promise.all(promises), const results = await Promise.all(promises),
execResult = results[results.length - 1]; execResult = results[results.length - 1];
@@ -805,7 +804,7 @@ export default class RedisClient<
} }
if (selectedDB !== undefined) { if (selectedDB !== undefined) {
this.self._selectedDB = selectedDB; this._self.#selectedDB = selectedDB;
} }
return execResult as Array<unknown>; return execResult as Array<unknown>;
@@ -873,16 +872,16 @@ export default class RedisClient<
} }
async MONITOR(callback: MonitorCallback<TYPE_MAPPING>) { async MONITOR(callback: MonitorCallback<TYPE_MAPPING>) {
const promise = this._queue.monitor(callback, this._commandOptions?.typeMapping); const promise = this._self.#queue.monitor(callback, this._commandOptions?.typeMapping);
this._scheduleWrite(); this._self.#scheduleWrite();
const off = await promise; const off = await promise;
this._monitorCallback = callback; this._self.#monitorCallback = callback;
return async () => { return async () => {
const promise = off(); const promise = off();
this._scheduleWrite(); this._self.#scheduleWrite();
await promise; await promise;
this._monitorCallback = undefined; this._self.#monitorCallback = undefined;
}; };
} }
@@ -892,10 +891,10 @@ export default class RedisClient<
* @deprecated use .close instead * @deprecated use .close instead
*/ */
QUIT(): Promise<string> { QUIT(): Promise<string> {
return this._socket.quit(async () => { return this._self.#socket.quit(async () => {
clearTimeout(this._pingTimer); clearTimeout(this._self.#pingTimer);
const quitPromise = this._queue.addCommand<string>(['QUIT']); const quitPromise = this._self.#queue.addCommand<string>(['QUIT']);
this._scheduleWrite(); this._self.#scheduleWrite();
return quitPromise; return quitPromise;
}); });
} }
@@ -914,22 +913,22 @@ export default class RedisClient<
*/ */
close() { close() {
return new Promise<void>(resolve => { return new Promise<void>(resolve => {
clearTimeout(this._pingTimer); clearTimeout(this._self.#pingTimer);
this._socket.close(); this._self.#socket.close();
if (this._queue.isEmpty()) { if (this._self.#queue.isEmpty()) {
this._socket.destroySocket(); this._self.#socket.destroySocket();
return resolve(); return resolve();
} }
const maybeClose = () => { const maybeClose = () => {
if (!this._queue.isEmpty()) return; if (!this._self.#queue.isEmpty()) return;
this._socket.off('data', maybeClose); this._self.#socket.off('data', maybeClose);
this._socket.destroySocket(); this._self.#socket.destroySocket();
resolve(); resolve();
}; };
this._socket.on('data', maybeClose); this._self.#socket.on('data', maybeClose);
}); });
} }
@@ -937,16 +936,16 @@ export default class RedisClient<
* Destroy the client. Rejects all commands immediately. * Destroy the client. Rejects all commands immediately.
*/ */
destroy() { destroy() {
clearTimeout(this._pingTimer); clearTimeout(this._self.#pingTimer);
this._queue.flushAll(new DisconnectsClientError()); this._self.#queue.flushAll(new DisconnectsClientError());
this._socket.destroy(); this._self.#socket.destroy();
} }
ref() { ref() {
this._socket.ref(); this._self.#socket.ref();
} }
unref() { unref() {
this._socket.unref(); this._self.#socket.unref();
} }
} }

View File

@@ -23,7 +23,7 @@ type WithCommands = {
export type RedisLegacyClientType = RedisLegacyClient & WithCommands; export type RedisLegacyClientType = RedisLegacyClient & WithCommands;
export class RedisLegacyClient { export class RedisLegacyClient {
private static _transformArguments(redisArgs: CommandArguments, args: LegacyCommandArguments) { static #transformArguments(redisArgs: CommandArguments, args: LegacyCommandArguments) {
let callback: LegacyCallback | undefined; let callback: LegacyCallback | undefined;
if (typeof args[args.length - 1] === 'function') { if (typeof args[args.length - 1] === 'function') {
callback = args.pop() as LegacyCallback; callback = args.pop() as LegacyCallback;
@@ -55,15 +55,15 @@ export class RedisLegacyClient {
undefined; undefined;
} }
private static _createCommand(name: string, command: Command, resp: RespVersions) { static #createCommand(name: string, command: Command, resp: RespVersions) {
const transformReply = RedisLegacyClient.getTransformReply(command, resp); const transformReply = RedisLegacyClient.getTransformReply(command, resp);
return async function (this: RedisLegacyClient, ...args: LegacyCommandArguments) { return async function (this: RedisLegacyClient, ...args: LegacyCommandArguments) {
const redisArgs = [name], const redisArgs = [name],
callback = RedisLegacyClient._transformArguments(redisArgs, args), callback = RedisLegacyClient.#transformArguments(redisArgs, args),
promise = this._client.sendCommand(redisArgs); promise = this.#client.sendCommand(redisArgs);
if (!callback) { if (!callback) {
promise.catch(err => this._client.emit('error', err)); promise.catch(err => this.#client.emit('error', err));
return; return;
} }
@@ -73,31 +73,34 @@ export class RedisLegacyClient {
}; };
} }
private _Multi: ReturnType<typeof LegacyMultiCommand['factory']>; #client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>;
#Multi: ReturnType<typeof LegacyMultiCommand['factory']>;
constructor( constructor(
private _client: RedisClientType<RedisModules, RedisFunctions, RedisScripts> client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>
) { ) {
const RESP = _client.options?.RESP ?? 2; this.#client = client;
const RESP = client.options?.RESP ?? 2;
for (const [name, command] of Object.entries(COMMANDS)) { for (const [name, command] of Object.entries(COMMANDS)) {
// TODO: as any? // TODO: as any?
(this as any)[name] = RedisLegacyClient._createCommand( (this as any)[name] = RedisLegacyClient.#createCommand(
name, name,
command, command,
RESP RESP
); );
} }
this._Multi = LegacyMultiCommand.factory(RESP); this.#Multi = LegacyMultiCommand.factory(RESP);
} }
sendCommand(...args: LegacyCommandArguments) { sendCommand(...args: LegacyCommandArguments) {
const redisArgs: CommandArguments = [], const redisArgs: CommandArguments = [],
callback = RedisLegacyClient._transformArguments(redisArgs, args), callback = RedisLegacyClient.#transformArguments(redisArgs, args),
promise = this._client.sendCommand(redisArgs); promise = this.#client.sendCommand(redisArgs);
if (!callback) { if (!callback) {
promise.catch(err => this._client.emit('error', err)); promise.catch(err => this.#client.emit('error', err));
return; return;
} }
@@ -107,7 +110,7 @@ export class RedisLegacyClient {
} }
multi() { multi() {
return this._Multi(this._client); return this.#Multi(this.#client);
} }
} }
@@ -118,12 +121,12 @@ type MultiWithCommands = {
export type RedisLegacyMultiType = LegacyMultiCommand & MultiWithCommands; export type RedisLegacyMultiType = LegacyMultiCommand & MultiWithCommands;
class LegacyMultiCommand { class LegacyMultiCommand {
private static _createCommand(name: string, command: Command, resp: RespVersions) { static #createCommand(name: string, command: Command, resp: RespVersions) {
const transformReply = RedisLegacyClient.getTransformReply(command, resp); const transformReply = RedisLegacyClient.getTransformReply(command, resp);
return function (this: LegacyMultiCommand, ...args: LegacyArguments) { return function (this: LegacyMultiCommand, ...args: LegacyArguments) {
const redisArgs = [name]; const redisArgs = [name];
RedisLegacyClient.pushArguments(redisArgs, args); RedisLegacyClient.pushArguments(redisArgs, args);
this._multi.addCommand(redisArgs, transformReply); this.#multi.addCommand(redisArgs, transformReply);
return this; return this;
}; };
} }
@@ -133,7 +136,7 @@ class LegacyMultiCommand {
for (const [name, command] of Object.entries(COMMANDS)) { for (const [name, command] of Object.entries(COMMANDS)) {
// TODO: as any? // TODO: as any?
(Multi as any).prototype[name] = LegacyMultiCommand._createCommand( (Multi as any).prototype[name] = LegacyMultiCommand.#createCommand(
name, name,
command, command,
resp resp
@@ -145,30 +148,30 @@ class LegacyMultiCommand {
}; };
} }
private readonly _multi = new RedisMultiCommand(); readonly #multi = new RedisMultiCommand();
private readonly _client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>; readonly #client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>;
constructor(client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>) { constructor(client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>) {
this._client = client; this.#client = client;
} }
sendCommand(...args: LegacyArguments) { sendCommand(...args: LegacyArguments) {
const redisArgs: CommandArguments = []; const redisArgs: CommandArguments = [];
RedisLegacyClient.pushArguments(redisArgs, args); RedisLegacyClient.pushArguments(redisArgs, args);
this._multi.addCommand(redisArgs); this.#multi.addCommand(redisArgs);
return this; return this;
} }
exec(cb?: (err: ErrorReply | null, replies?: Array<unknown>) => unknown) { exec(cb?: (err: ErrorReply | null, replies?: Array<unknown>) => unknown) {
const promise = this._client._executeMulti(this._multi.queue); const promise = this.#client._executeMulti(this.#multi.queue);
if (!cb) { if (!cb) {
promise.catch(err => this._client.emit('error', err)); promise.catch(err => this.#client.emit('error', err));
return; return;
} }
promise promise
.then(results => cb(null, this._multi.transformReplies(results))) .then(results => cb(null, this.#multi.transformReplies(results)))
.catch(err => cb?.(err)); .catch(err => cb?.(err));
} }
} }

View File

@@ -5,56 +5,56 @@ export interface DoublyLinkedNode<T> {
} }
export class DoublyLinkedList<T> { export class DoublyLinkedList<T> {
private _length = 0; #length = 0;
get length() { get length() {
return this._length; return this.#length;
} }
private _head?: DoublyLinkedNode<T>; #head?: DoublyLinkedNode<T>;
get head() { get head() {
return this._head; return this.#head;
} }
private _tail?: DoublyLinkedNode<T>; #tail?: DoublyLinkedNode<T>;
get tail() { get tail() {
return this._tail; return this.#tail;
} }
push(value: T) { push(value: T) {
++this._length; ++this.#length;
if (this._tail === undefined) { if (this.#tail === undefined) {
return this._tail = this._head = { return this.#tail = this.#head = {
previous: this._head, previous: this.#head,
next: undefined, next: undefined,
value value
}; };
} }
return this._tail = this._tail.next = { return this.#tail = this.#tail.next = {
previous: this._tail, previous: this.#tail,
next: undefined, next: undefined,
value value
}; };
} }
unshift(value: T) { unshift(value: T) {
++this._length; ++this.#length;
if (this._head === undefined) { if (this.#head === undefined) {
return this._head = this._tail = { return this.#head = this.#tail = {
previous: undefined, previous: undefined,
next: undefined, next: undefined,
value value
}; };
} }
return this._head = this._head.previous = { return this.#head = this.#head.previous = {
previous: undefined, previous: undefined,
next: this._head, next: this.#head,
value value
}; };
} }
@@ -66,29 +66,29 @@ export class DoublyLinkedList<T> {
} }
shift() { shift() {
if (this._head === undefined) return undefined; if (this.#head === undefined) return undefined;
--this._length; --this.#length;
const node = this._head; const node = this.#head;
if (node.next) { if (node.next) {
node.next.previous = node.previous; node.next.previous = node.previous;
this._head = node.next; this.#head = node.next;
node.next = undefined; node.next = undefined;
} else { } else {
this._head = this._tail = undefined; this.#head = this.#tail = undefined;
} }
return node.value; return node.value;
} }
remove(node: DoublyLinkedNode<T>) { remove(node: DoublyLinkedNode<T>) {
--this._length; --this.#length;
if (this._tail === node) { if (this.#tail === node) {
this._tail = node.previous; this.#tail = node.previous;
} }
if (this._head === node) { if (this.#head === node) {
this._head = node.next; this.#head = node.next;
} else { } else {
node.previous!.next = node.next; node.previous!.next = node.next;
node.previous = undefined; node.previous = undefined;
@@ -98,12 +98,12 @@ export class DoublyLinkedList<T> {
} }
reset() { reset() {
this._length = 0; this.#length = 0;
this._head = this._tail = undefined; this.#head = this.#tail = undefined;
} }
*[Symbol.iterator]() { *[Symbol.iterator]() {
let node = this._head; let node = this.#head;
while (node !== undefined) { while (node !== undefined) {
yield node.value; yield node.value;
node = node.next; node = node.next;
@@ -117,50 +117,50 @@ export interface SinglyLinkedNode<T> {
} }
export class SinglyLinkedList<T> { export class SinglyLinkedList<T> {
private _length = 0; #length = 0;
get length() { get length() {
return this._length; return this.#length;
} }
private _head?: SinglyLinkedNode<T>; #head?: SinglyLinkedNode<T>;
get head() { get head() {
return this._head; return this.#head;
} }
private _tail?: SinglyLinkedNode<T>; #tail?: SinglyLinkedNode<T>;
get tail() { get tail() {
return this._tail; return this.#tail;
} }
push(value: T) { push(value: T) {
++this._length; ++this.#length;
const node = { const node = {
value, value,
next: undefined next: undefined
}; };
if (this._head === undefined) { if (this.#head === undefined) {
return this._head = this._tail = node; return this.#head = this.#tail = node;
} }
return this._tail!.next = this._tail = node; return this.#tail!.next = this.#tail = node;
} }
remove(node: SinglyLinkedNode<T>, parent: SinglyLinkedNode<T> | undefined) { remove(node: SinglyLinkedNode<T>, parent: SinglyLinkedNode<T> | undefined) {
--this._length; --this.#length;
if (this._head === node) { if (this.#head === node) {
if (this._tail === node) { if (this.#tail === node) {
this._head = this._tail = undefined; this.#head = this.#tail = undefined;
} else { } else {
this._head = node.next; this.#head = node.next;
} }
} else if (this._tail === node) { } else if (this.#tail === node) {
this._tail = parent; this.#tail = parent;
parent!.next = undefined; parent!.next = undefined;
} else { } else {
parent!.next = node.next; parent!.next = node.next;
@@ -168,25 +168,25 @@ export class SinglyLinkedList<T> {
} }
shift() { shift() {
if (this._head === undefined) return undefined; if (this.#head === undefined) return undefined;
const node = this._head; const node = this.#head;
if (--this._length === 0) { if (--this.#length === 0) {
this._head = this._tail = undefined; this.#head = this.#tail = undefined;
} else { } else {
this._head = node.next; this.#head = node.next;
} }
return node.value; return node.value;
} }
reset() { reset() {
this._length = 0; this.#length = 0;
this._head = this._tail = undefined; this.#head = this.#tail = undefined;
} }
*[Symbol.iterator]() { *[Symbol.iterator]() {
let node = this._head; let node = this.#head;
while (node !== undefined) { while (node !== undefined) {
yield node.value; yield node.value;
node = node.next; node = node.next;

View File

@@ -86,7 +86,7 @@ export type RedisClientMultiCommandType<
type ExecuteMulti = (commands: Array<RedisMultiQueuedCommand>, selectedDB?: number) => Promise<Array<unknown>>; type ExecuteMulti = (commands: Array<RedisMultiQueuedCommand>, selectedDB?: number) => Promise<Array<unknown>>;
export default class RedisClientMultiCommand<REPLIES = []> { export default class RedisClientMultiCommand<REPLIES = []> {
private static _createCommand(command: Command, resp: RespVersions) { static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return function (this: RedisClientMultiCommand, ...args: Array<unknown>) { return function (this: RedisClientMultiCommand, ...args: Array<unknown>) {
return this.addCommand( return this.addCommand(
@@ -96,34 +96,34 @@ export default class RedisClientMultiCommand<REPLIES = []> {
}; };
} }
private static _createModuleCommand(command: Command, resp: RespVersions) { static #createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return function (this: { self: RedisClientMultiCommand }, ...args: Array<unknown>) { return function (this: { _self: RedisClientMultiCommand }, ...args: Array<unknown>) {
return this.self.addCommand( return this._self.addCommand(
command.transformArguments(...args), command.transformArguments(...args),
transformReply transformReply
); );
}; };
} }
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn), const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp); transformReply = getTransformReply(fn, resp);
return function (this: { self: RedisClientMultiCommand }, ...args: Array<unknown>) { return function (this: { _self: RedisClientMultiCommand }, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args), const fnArgs = fn.transformArguments(...args),
redisArgs: CommandArguments = prefix.concat(fnArgs); redisArgs: CommandArguments = prefix.concat(fnArgs);
redisArgs.preserve = fnArgs.preserve; redisArgs.preserve = fnArgs.preserve;
return this.self.addCommand( return this._self.addCommand(
redisArgs, redisArgs,
transformReply transformReply
); );
}; };
} }
private static _createScriptCommand(script: RedisScript, resp: RespVersions) { static #createScriptCommand(script: RedisScript, resp: RespVersions) {
const transformReply = getTransformReply(script, resp); const transformReply = getTransformReply(script, resp);
return function (this: RedisClientMultiCommand, ...args: Array<unknown>) { return function (this: RedisClientMultiCommand, ...args: Array<unknown>) {
this._multi.addScript( this.#multi.addScript(
script, script,
script.transformArguments(...args), script.transformArguments(...args),
transformReply transformReply
@@ -141,42 +141,42 @@ export default class RedisClientMultiCommand<REPLIES = []> {
return attachConfig({ return attachConfig({
BaseClass: RedisClientMultiCommand, BaseClass: RedisClientMultiCommand,
commands: COMMANDS, commands: COMMANDS,
createCommand: RedisClientMultiCommand._createCommand, createCommand: RedisClientMultiCommand.#createCommand,
createModuleCommand: RedisClientMultiCommand._createModuleCommand, createModuleCommand: RedisClientMultiCommand.#createModuleCommand,
createFunctionCommand: RedisClientMultiCommand._createFunctionCommand, createFunctionCommand: RedisClientMultiCommand.#createFunctionCommand,
createScriptCommand: RedisClientMultiCommand._createScriptCommand, createScriptCommand: RedisClientMultiCommand.#createScriptCommand,
config config
}); });
} }
private readonly _multi = new RedisMultiCommand(); readonly #multi = new RedisMultiCommand();
private readonly _executeMulti: ExecuteMulti; readonly #executeMulti: ExecuteMulti;
private readonly _executePipeline: ExecuteMulti; readonly #executePipeline: ExecuteMulti;
private _selectedDB?: number; #selectedDB?: number;
constructor(executeMulti: ExecuteMulti, executePipeline: ExecuteMulti) { constructor(executeMulti: ExecuteMulti, executePipeline: ExecuteMulti) {
this._executeMulti = executeMulti; this.#executeMulti = executeMulti;
this._executePipeline = executePipeline; this.#executePipeline = executePipeline;
} }
SELECT(db: number, transformReply?: TransformReply): this { SELECT(db: number, transformReply?: TransformReply): this {
this._selectedDB = db; this.#selectedDB = db;
this._multi.addCommand(['SELECT', db.toString()], transformReply); this.#multi.addCommand(['SELECT', db.toString()], transformReply);
return this; return this;
} }
select = this.SELECT; select = this.SELECT;
addCommand(args: CommandArguments, transformReply?: TransformReply) { addCommand(args: CommandArguments, transformReply?: TransformReply) {
this._multi.addCommand(args, transformReply); this.#multi.addCommand(args, transformReply);
return this; return this;
} }
async exec<T extends MultiReply = MULTI_REPLY['GENERIC']>(execAsPipeline = false): Promise<MultiReplyType<T, REPLIES>> { async exec<T extends MultiReply = MULTI_REPLY['GENERIC']>(execAsPipeline = false): Promise<MultiReplyType<T, REPLIES>> {
if (execAsPipeline) return this.execAsPipeline<T>(); if (execAsPipeline) return this.execAsPipeline<T>();
return this._multi.transformReplies( return this.#multi.transformReplies(
await this._executeMulti(this._multi.queue, this._selectedDB) await this.#executeMulti(this.#multi.queue, this.#selectedDB)
) as MultiReplyType<T, REPLIES>; ) as MultiReplyType<T, REPLIES>;
} }
@@ -187,10 +187,10 @@ export default class RedisClientMultiCommand<REPLIES = []> {
} }
async execAsPipeline<T extends MultiReply = MULTI_REPLY['GENERIC']>(): Promise<MultiReplyType<T, REPLIES>> { async execAsPipeline<T extends MultiReply = MULTI_REPLY['GENERIC']>(): Promise<MultiReplyType<T, REPLIES>> {
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>; if (this.#multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
return this._multi.transformReplies( return this.#multi.transformReplies(
await this._executePipeline(this._multi.queue, this._selectedDB) await this.#executePipeline(this.#multi.queue, this.#selectedDB)
) as MultiReplyType<T, REPLIES>; ) as MultiReplyType<T, REPLIES>;
} }

View File

@@ -49,7 +49,7 @@ export type RedisClientPoolType<
type ProxyPool = RedisClientPoolType<any, any, any, any, any>; type ProxyPool = RedisClientPoolType<any, any, any, any, any>;
type NamespaceProxyPool = { self: ProxyPool }; type NamespaceProxyPool = { _self: ProxyPool };
export class RedisClientPool< export class RedisClientPool<
M extends RedisModules = {}, M extends RedisModules = {},
@@ -58,7 +58,7 @@ export class RedisClientPool<
RESP extends RespVersions = 2, RESP extends RespVersions = 2,
TYPE_MAPPING extends TypeMapping = {} TYPE_MAPPING extends TypeMapping = {}
> extends EventEmitter { > extends EventEmitter {
private static _createCommand(command: Command, resp: RespVersions) { static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return async function (this: ProxyPool, ...args: Array<unknown>) { return async function (this: ProxyPool, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args), const redisArgs = command.transformArguments(...args),
@@ -69,25 +69,25 @@ export class RedisClientPool<
}; };
} }
private static _createModuleCommand(command: Command, resp: RespVersions) { static #createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return async function (this: NamespaceProxyPool, ...args: Array<unknown>) { return async function (this: NamespaceProxyPool, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args), const redisArgs = command.transformArguments(...args),
reply = await this.self.sendCommand(redisArgs, this.self._commandOptions); reply = await this._self.sendCommand(redisArgs, this._self._commandOptions);
return transformReply ? return transformReply ?
transformReply(reply, redisArgs.preserve) : transformReply(reply, redisArgs.preserve) :
reply; reply;
}; };
} }
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn), const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp); transformReply = getTransformReply(fn, resp);
return async function (this: NamespaceProxyPool, ...args: Array<unknown>) { return async function (this: NamespaceProxyPool, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args), const fnArgs = fn.transformArguments(...args),
reply = await this.self.sendCommand( reply = await this._self.sendCommand(
prefix.concat(fnArgs), prefix.concat(fnArgs),
this.self._commandOptions this._self._commandOptions
); );
return transformReply ? return transformReply ?
transformReply(reply, fnArgs.preserve) : transformReply(reply, fnArgs.preserve) :
@@ -95,7 +95,7 @@ export class RedisClientPool<
}; };
} }
private static _createScriptCommand(script: RedisScript, resp: RespVersions) { static #createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script), const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp); transformReply = getTransformReply(script, resp);
return async function (this: ProxyPool, ...args: Array<unknown>) { return async function (this: ProxyPool, ...args: Array<unknown>) {
@@ -115,23 +115,22 @@ export class RedisClientPool<
RESP extends RespVersions, RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping = {} TYPE_MAPPING extends TypeMapping = {}
>( >(
// clientFactory: () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
clientOptions?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>, clientOptions?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>,
options?: Partial<RedisPoolOptions> options?: Partial<RedisPoolOptions>
) { ) {
const Pool = attachConfig({ const Pool = attachConfig({
BaseClass: RedisClientPool, BaseClass: RedisClientPool,
commands: COMMANDS, commands: COMMANDS,
createCommand: RedisClientPool._createCommand, createCommand: RedisClientPool.#createCommand,
createModuleCommand: RedisClientPool._createModuleCommand, createModuleCommand: RedisClientPool.#createModuleCommand,
createFunctionCommand: RedisClientPool._createFunctionCommand, createFunctionCommand: RedisClientPool.#createFunctionCommand,
createScriptCommand: RedisClientPool._createScriptCommand, createScriptCommand: RedisClientPool.#createScriptCommand,
config: clientOptions config: clientOptions
}); });
Pool.prototype.Multi = RedisClientMultiCommand.extend(clientOptions); Pool.prototype.Multi = RedisClientMultiCommand.extend(clientOptions);
// returning a "proxy" to prevent the namespaces.self to leak between "proxies" // returning a "proxy" to prevent the namespaces._self to leak between "proxies"
return Object.create( return Object.create(
new Pool( new Pool(
RedisClient.factory(clientOptions).bind(undefined, clientOptions), RedisClient.factory(clientOptions).bind(undefined, clientOptions),
@@ -141,51 +140,42 @@ export class RedisClientPool<
} }
// TODO: defaults // TODO: defaults
private static _DEFAULTS = { static #DEFAULTS = {
minimum: 1, minimum: 1,
maximum: 100, maximum: 100,
acquireTimeout: 3000, acquireTimeout: 3000,
cleanupDelay: 3000 cleanupDelay: 3000
} satisfies RedisPoolOptions; } satisfies RedisPoolOptions;
private readonly _clientFactory: () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>; readonly #clientFactory: () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
private readonly _options: RedisPoolOptions; readonly #options: RedisPoolOptions;
private readonly _idleClients = new SinglyLinkedList<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>(); readonly #idleClients = new SinglyLinkedList<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>();
/** /**
* The number of idle clients. * The number of idle clients.
*/ */
get idleClients() { get idleClients() {
return this._idleClients.length; return this._self.#idleClients.length;
} }
private readonly _clientsInUse = new DoublyLinkedList<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>(); readonly #clientsInUse = new DoublyLinkedList<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>();
/** /**
* The number of clients in use. * The number of clients in use.
*/ */
get clientsInUse() { get clientsInUse() {
return this._clientsInUse.length; return this._self.#clientsInUse.length;
}
private readonly _connectingClients = 0;
/**
* The number of clients that are currently connecting.
*/
get connectingClients() {
return this._connectingClients;
} }
/** /**
* The total number of clients in the pool (including connecting, idle, and in use). * The total number of clients in the pool (including connecting, idle, and in use).
*/ */
get totalClients() { get totalClients() {
return this._idleClients.length + this._clientsInUse.length; return this._self.#idleClients.length + this._self.#clientsInUse.length;
} }
private readonly _tasksQueue = new SinglyLinkedList<{ readonly #tasksQueue = new SinglyLinkedList<{
timeout: NodeJS.Timeout | undefined; timeout: NodeJS.Timeout | undefined;
resolve: (value: unknown) => unknown; resolve: (value: unknown) => unknown;
reject: (reason?: unknown) => unknown; reject: (reason?: unknown) => unknown;
@@ -196,25 +186,25 @@ export class RedisClientPool<
* The number of tasks waiting for a client to become available. * The number of tasks waiting for a client to become available.
*/ */
get tasksQueueLength() { get tasksQueueLength() {
return this._tasksQueue.length; return this._self.#tasksQueue.length;
} }
private _isOpen = false; #isOpen = false;
/** /**
* Whether the pool is open (either connecting or connected). * Whether the pool is open (either connecting or connected).
*/ */
get isOpen() { get isOpen() {
return this._isOpen; return this._self.#isOpen;
} }
private _isClosing = false; #isClosing = false;
/** /**
* Whether the pool is closing (*not* closed). * Whether the pool is closing (*not* closed).
*/ */
get isClosing() { get isClosing() {
return this._isClosing; return this._self.#isClosing;
} }
/** /**
@@ -228,9 +218,9 @@ export class RedisClientPool<
) { ) {
super(); super();
this._clientFactory = clientFactory; this.#clientFactory = clientFactory;
this._options = { this.#options = {
...RedisClientPool._DEFAULTS, ...RedisClientPool.#DEFAULTS,
...options ...options
}; };
} }
@@ -253,7 +243,7 @@ export class RedisClientPool<
>; >;
} }
private _commandOptionsProxy< #commandOptionsProxy<
K extends keyof CommandOptions, K extends keyof CommandOptions,
V extends CommandOptions[K] V extends CommandOptions[K]
>( >(
@@ -276,14 +266,14 @@ export class RedisClientPool<
* Override the `typeMapping` command option * Override the `typeMapping` command option
*/ */
withTypeMapping<TYPE_MAPPING extends TypeMapping>(typeMapping: TYPE_MAPPING) { withTypeMapping<TYPE_MAPPING extends TypeMapping>(typeMapping: TYPE_MAPPING) {
return this._commandOptionsProxy('typeMapping', typeMapping); return this._self.#commandOptionsProxy('typeMapping', typeMapping);
} }
/** /**
* Override the `abortSignal` command option * Override the `abortSignal` command option
*/ */
withAbortSignal(abortSignal: AbortSignal) { withAbortSignal(abortSignal: AbortSignal) {
return this._commandOptionsProxy('abortSignal', abortSignal); return this._self.#commandOptionsProxy('abortSignal', abortSignal);
} }
/** /**
@@ -291,17 +281,17 @@ export class RedisClientPool<
* TODO: remove? * TODO: remove?
*/ */
asap() { asap() {
return this._commandOptionsProxy('asap', true); return this._self.#commandOptionsProxy('asap', true);
} }
async connect() { async connect() {
if (this._isOpen) return; // TODO: throw error? if (this._self.#isOpen) return; // TODO: throw error?
this._isOpen = true; this._self.#isOpen = true;
const promises = []; const promises = [];
while (promises.length < this._options.minimum) { while (promises.length < this._self.#options.minimum) {
promises.push(this._create()); promises.push(this._self.#create());
} }
try { try {
@@ -313,39 +303,39 @@ export class RedisClientPool<
} }
} }
private async _create() { async #create() {
const node = this._clientsInUse.push( const node = this._self.#clientsInUse.push(
this._clientFactory() this._self.#clientFactory()
.on('error', (err: Error) => this.emit('error', err)) .on('error', (err: Error) => this.emit('error', err))
); );
try { try {
await node.value.connect(); await node.value.connect();
} catch (err) { } catch (err) {
this._clientsInUse.remove(node); this._self.#clientsInUse.remove(node);
throw err; throw err;
} }
this._returnClient(node); this._self.#returnClient(node);
} }
execute<T>(fn: PoolTask<M, F, S, RESP, TYPE_MAPPING, T>) { execute<T>(fn: PoolTask<M, F, S, RESP, TYPE_MAPPING, T>) {
return new Promise<Awaited<T>>((resolve, reject) => { return new Promise<Awaited<T>>((resolve, reject) => {
const client = this._idleClients.shift(), const client = this._self.#idleClients.shift(),
{ tail } = this._tasksQueue; { tail } = this._self.#tasksQueue;
if (!client) { if (!client) {
let timeout; let timeout;
if (this._options.acquireTimeout > 0) { if (this._self.#options.acquireTimeout > 0) {
timeout = setTimeout( timeout = setTimeout(
() => { () => {
this._tasksQueue.remove(task, tail); this._self.#tasksQueue.remove(task, tail);
reject(new TimeoutError('Timeout waiting for a client')); // TODO: message reject(new TimeoutError('Timeout waiting for a client')); // TODO: message
}, },
this._options.acquireTimeout this._self.#options.acquireTimeout
); );
} }
const task = this._tasksQueue.push({ const task = this._self.#tasksQueue.push({
timeout, timeout,
// @ts-ignore // @ts-ignore
resolve, resolve,
@@ -353,20 +343,20 @@ export class RedisClientPool<
fn fn
}); });
if (this.totalClients < this._options.maximum) { if (this.totalClients < this._self.#options.maximum) {
this._create(); this._self.#create();
} }
return; return;
} }
const node = this._clientsInUse.push(client); const node = this._self.#clientsInUse.push(client);
// @ts-ignore // @ts-ignore
this._executeTask(node, resolve, reject, fn); this._self.#executeTask(node, resolve, reject, fn);
}); });
} }
private _executeTask( #executeTask(
node: DoublyLinkedNode<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>, node: DoublyLinkedNode<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>,
resolve: <T>(value: T | PromiseLike<T>) => void, resolve: <T>(value: T | PromiseLike<T>) => void,
reject: (reason?: unknown) => void, reject: (reason?: unknown) => void,
@@ -375,40 +365,40 @@ export class RedisClientPool<
const result = fn(node.value); const result = fn(node.value);
if (result instanceof Promise) { if (result instanceof Promise) {
result.then(resolve, reject); result.then(resolve, reject);
result.finally(() => this._returnClient(node)) result.finally(() => this.#returnClient(node))
} else { } else {
resolve(result); resolve(result);
this._returnClient(node); this.#returnClient(node);
} }
} }
private _returnClient(node: DoublyLinkedNode<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>) { #returnClient(node: DoublyLinkedNode<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>) {
const task = this._tasksQueue.shift(); const task = this.#tasksQueue.shift();
if (task) { if (task) {
this._executeTask(node, task.resolve, task.reject, task.fn); this.#executeTask(node, task.resolve, task.reject, task.fn);
return; return;
} }
this._clientsInUse.remove(node); this.#clientsInUse.remove(node);
this._idleClients.push(node.value); this.#idleClients.push(node.value);
this._scheduleCleanup(); this.#scheduleCleanup();
} }
cleanupTimeout?: NodeJS.Timeout; cleanupTimeout?: NodeJS.Timeout;
private _scheduleCleanup() { #scheduleCleanup() {
if (this.totalClients <= this._options.minimum) return; if (this.totalClients <= this.#options.minimum) return;
clearTimeout(this.cleanupTimeout); clearTimeout(this.cleanupTimeout);
this.cleanupTimeout = setTimeout(() => this._cleanup(), this._options.cleanupDelay); this.cleanupTimeout = setTimeout(() => this.#cleanup(), this.#options.cleanupDelay);
} }
private _cleanup() { #cleanup() {
const toDestroy = Math.min(this._idleClients.length, this.totalClients - this._options.minimum); const toDestroy = Math.min(this.#idleClients.length, this.totalClients - this.#options.minimum);
for (let i = 0; i < toDestroy; i++) { for (let i = 0; i < toDestroy; i++) {
// TODO: shift vs pop // TODO: shift vs pop
this._idleClients.shift()!.destroy(); this.#idleClients.shift()!.destroy();
} }
} }
@@ -438,44 +428,44 @@ export class RedisClientPool<
multi = this.MULTI; multi = this.MULTI;
async close() { async close() {
if (this._isClosing) return; // TODO: throw err? if (this._self.#isClosing) return; // TODO: throw err?
if (!this._isOpen) return; // TODO: throw err? if (!this._self.#isOpen) return; // TODO: throw err?
this._isClosing = true; this._self.#isClosing = true;
try { try {
const promises = []; const promises = [];
for (const client of this._idleClients) { for (const client of this._self.#idleClients) {
promises.push(client.close()); promises.push(client.close());
} }
for (const client of this._clientsInUse) { for (const client of this._self.#clientsInUse) {
promises.push(client.close()); promises.push(client.close());
} }
await Promise.all(promises); await Promise.all(promises);
this._idleClients.reset(); this._self.#idleClients.reset();
this._clientsInUse.reset(); this._self.#clientsInUse.reset();
} catch (err) { } catch (err) {
} finally { } finally {
this._isClosing = false; this._self.#isClosing = false;
} }
} }
destroy() { destroy() {
for (const client of this._idleClients) { for (const client of this._self.#idleClients) {
client.destroy(); client.destroy();
} }
this._idleClients.reset(); this._self.#idleClients.reset();
for (const client of this._clientsInUse) { for (const client of this._self.#clientsInUse) {
client.destroy(); client.destroy();
} }
this._clientsInUse.reset(); this._self.#clientsInUse.reset();
this._isOpen = false; this._self.#isOpen = false;
} }
} }

View File

@@ -46,7 +46,7 @@ interface CreateSocketReturn<T> {
export type RedisSocketInitiator = () => Promise<void>; export type RedisSocketInitiator = () => Promise<void>;
export default class RedisSocket extends EventEmitter { export default class RedisSocket extends EventEmitter {
private static _initiateOptions(options?: RedisSocketOptions): RedisSocketOptions { static #initiateOptions(options?: RedisSocketOptions): RedisSocketOptions {
options ??= {}; options ??= {};
if (!(options as net.IpcSocketConnectOpts).path) { if (!(options as net.IpcSocketConnectOpts).path) {
(options as net.TcpSocketConnectOpts).port ??= 6379; (options as net.TcpSocketConnectOpts).port ??= 6379;
@@ -60,45 +60,45 @@ export default class RedisSocket extends EventEmitter {
return options; return options;
} }
private static _isTlsSocket(options: RedisSocketOptions): options is RedisTlsSocketOptions { static #isTlsSocket(options: RedisSocketOptions): options is RedisTlsSocketOptions {
return (options as RedisTlsSocketOptions).tls === true; return (options as RedisTlsSocketOptions).tls === true;
} }
private readonly _initiator: RedisSocketInitiator; readonly #initiator: RedisSocketInitiator;
private readonly _options: RedisSocketOptions; readonly #options: RedisSocketOptions;
private _socket?: net.Socket | tls.TLSSocket; #socket?: net.Socket | tls.TLSSocket;
private _isOpen = false; #isOpen = false;
get isOpen(): boolean { get isOpen(): boolean {
return this._isOpen; return this.#isOpen;
} }
private _isReady = false; #isReady = false;
get isReady(): boolean { get isReady(): boolean {
return this._isReady; return this.#isReady;
} }
private _isSocketUnrefed = false; #isSocketUnrefed = false;
constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) {
super(); super();
this._initiator = initiator; this.#initiator = initiator;
this._options = RedisSocket._initiateOptions(options); this.#options = RedisSocket.#initiateOptions(options);
} }
private _reconnectStrategy(retries: number, cause: Error) { #reconnectStrategy(retries: number, cause: Error) {
if (this._options.reconnectStrategy === false) { if (this.#options.reconnectStrategy === false) {
return false; return false;
} else if (typeof this._options.reconnectStrategy === 'number') { } else if (typeof this.#options.reconnectStrategy === 'number') {
return this._options.reconnectStrategy; return this.#options.reconnectStrategy;
} else if (this._options.reconnectStrategy) { } else if (this.#options.reconnectStrategy) {
try { try {
const retryIn = this._options.reconnectStrategy(retries, cause); const retryIn = this.#options.reconnectStrategy(retries, cause);
if (retryIn !== false && !(retryIn instanceof Error) && typeof retryIn !== 'number') { if (retryIn !== false && !(retryIn instanceof Error) && typeof retryIn !== 'number') {
throw new TypeError(`Reconnect strategy should return \`false | Error | number\`, got ${retryIn} instead`); throw new TypeError(`Reconnect strategy should return \`false | Error | number\`, got ${retryIn} instead`);
} }
@@ -112,14 +112,14 @@ export default class RedisSocket extends EventEmitter {
return Math.min(retries * 50, 500); return Math.min(retries * 50, 500);
} }
private _shouldReconnect(retries: number, cause: Error) { #shouldReconnect(retries: number, cause: Error) {
const retryIn = this._reconnectStrategy(retries, cause); const retryIn = this.#reconnectStrategy(retries, cause);
if (retryIn === false) { if (retryIn === false) {
this._isOpen = false; this.#isOpen = false;
this.emit('error', cause); this.emit('error', cause);
return cause; return cause;
} else if (retryIn instanceof Error) { } else if (retryIn instanceof Error) {
this._isOpen = false; this.#isOpen = false;
this.emit('error', cause); this.emit('error', cause);
return new ReconnectStrategyError(retryIn, cause); return new ReconnectStrategyError(retryIn, cause);
} }
@@ -128,32 +128,32 @@ export default class RedisSocket extends EventEmitter {
} }
async connect(): Promise<void> { async connect(): Promise<void> {
if (this._isOpen) { if (this.#isOpen) {
throw new Error('Socket already opened'); throw new Error('Socket already opened');
} }
this._isOpen = true; this.#isOpen = true;
return this._connect(); return this.#connect();
} }
private async _connect(): Promise<void> { async #connect(): Promise<void> {
let retries = 0; let retries = 0;
do { do {
try { try {
this._socket = await this._createSocket(); this.#socket = await this.#createSocket();
this.emit('connect'); this.emit('connect');
try { try {
await this._initiator(); await this.#initiator();
} catch (err) { } catch (err) {
this._socket.destroy(); this.#socket.destroy();
this._socket = undefined; this.#socket = undefined;
throw err; throw err;
} }
this._isReady = true; this.#isReady = true;
this.emit('ready'); this.emit('ready');
} catch (err) { } catch (err) {
const retryIn = this._shouldReconnect(retries++, err as Error); const retryIn = this.#shouldReconnect(retries++, err as Error);
if (typeof retryIn !== 'number') { if (typeof retryIn !== 'number') {
throw retryIn; throw retryIn;
} }
@@ -162,36 +162,36 @@ export default class RedisSocket extends EventEmitter {
await setTimeout(retryIn); await setTimeout(retryIn);
this.emit('reconnecting'); this.emit('reconnecting');
} }
} while (this._isOpen && !this._isReady); } while (this.#isOpen && !this.#isReady);
} }
private _createSocket(): Promise<net.Socket | tls.TLSSocket> { #createSocket(): Promise<net.Socket | tls.TLSSocket> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const { connectEvent, socket } = RedisSocket._isTlsSocket(this._options) ? const { connectEvent, socket } = RedisSocket.#isTlsSocket(this.#options) ?
this._createTlsSocket() : this.#createTlsSocket() :
this._createNetSocket(); this.#createNetSocket();
if (this._options.connectTimeout) { if (this.#options.connectTimeout) {
socket.setTimeout(this._options.connectTimeout, () => socket.destroy(new ConnectionTimeoutError())); socket.setTimeout(this.#options.connectTimeout, () => socket.destroy(new ConnectionTimeoutError()));
} }
if (this._isSocketUnrefed) { if (this.#isSocketUnrefed) {
socket.unref(); socket.unref();
} }
socket socket
.setNoDelay(this._options.noDelay) .setNoDelay(this.#options.noDelay)
.once('error', reject) .once('error', reject)
.once(connectEvent, () => { .once(connectEvent, () => {
socket socket
.setTimeout(0) .setTimeout(0)
// https://github.com/nodejs/node/issues/31663 // https://github.com/nodejs/node/issues/31663
.setKeepAlive(this._options.keepAlive !== false, this._options.keepAlive || 0) .setKeepAlive(this.#options.keepAlive !== false, this.#options.keepAlive || 0)
.off('error', reject) .off('error', reject)
.once('error', (err: Error) => this._onSocketError(err)) .once('error', (err: Error) => this.#onSocketError(err))
.once('close', hadError => { .once('close', hadError => {
if (!hadError && this._isOpen && this._socket === socket) { if (!hadError && this.#isOpen && this.#socket === socket) {
this._onSocketError(new SocketClosedUnexpectedlyError()); this.#onSocketError(new SocketClosedUnexpectedlyError());
} }
}) })
.on('drain', () => this.emit('drain')) .on('drain', () => this.emit('drain'))
@@ -202,93 +202,93 @@ export default class RedisSocket extends EventEmitter {
}); });
} }
private _createNetSocket(): CreateSocketReturn<net.Socket> { #createNetSocket(): CreateSocketReturn<net.Socket> {
return { return {
connectEvent: 'connect', connectEvent: 'connect',
socket: net.connect(this._options as net.NetConnectOpts) // TODO socket: net.connect(this.#options as net.NetConnectOpts) // TODO
}; };
} }
private _createTlsSocket(): CreateSocketReturn<tls.TLSSocket> { #createTlsSocket(): CreateSocketReturn<tls.TLSSocket> {
return { return {
connectEvent: 'secureConnect', connectEvent: 'secureConnect',
socket: tls.connect(this._options as tls.ConnectionOptions) // TODO socket: tls.connect(this.#options as tls.ConnectionOptions) // TODO
}; };
} }
private _onSocketError(err: Error): void { #onSocketError(err: Error): void {
const wasReady = this._isReady; const wasReady = this.#isReady;
this._isReady = false; this.#isReady = false;
this.emit('error', err); this.emit('error', err);
if (!wasReady || !this._isOpen || typeof this._shouldReconnect(0, err) !== 'number') return; if (!wasReady || !this.#isOpen || typeof this.#shouldReconnect(0, err) !== 'number') return;
this.emit('reconnecting'); this.emit('reconnecting');
this._connect().catch(() => { this.#connect().catch(() => {
// the error was already emitted, silently ignore it // the error was already emitted, silently ignore it
}); });
} }
write(iterator: IterableIterator<Array<RedisArgument>>): void { write(iterator: IterableIterator<Array<RedisArgument>>): void {
if (!this._socket) return; if (!this.#socket) return;
this._socket.cork(); this.#socket.cork();
for (const args of iterator) { for (const args of iterator) {
for (const toWrite of args) { for (const toWrite of args) {
this._socket.write(toWrite); this.#socket.write(toWrite);
} }
if (this._socket.writableNeedDrain) break; if (this.#socket.writableNeedDrain) break;
} }
this._socket.uncork(); this.#socket.uncork();
} }
async quit<T>(fn: () => Promise<T>): Promise<T> { async quit<T>(fn: () => Promise<T>): Promise<T> {
if (!this._isOpen) { if (!this.#isOpen) {
throw new ClientClosedError(); throw new ClientClosedError();
} }
this._isOpen = false; this.#isOpen = false;
const reply = await fn(); const reply = await fn();
this.destroySocket(); this.destroySocket();
return reply; return reply;
} }
close() { close() {
if (!this._isOpen) { if (!this.#isOpen) {
throw new ClientClosedError(); throw new ClientClosedError();
} }
this._isOpen = false; this.#isOpen = false;
} }
destroy() { destroy() {
if (!this._isOpen) { if (!this.#isOpen) {
throw new ClientClosedError(); throw new ClientClosedError();
} }
this._isOpen = false; this.#isOpen = false;
this.destroySocket(); this.destroySocket();
} }
destroySocket() { destroySocket() {
this._isReady = false; this.#isReady = false;
if (this._socket) { if (this.#socket) {
this._socket.destroy(); this.#socket.destroy();
this._socket = undefined; this.#socket = undefined;
} }
this.emit('end'); this.emit('end');
} }
ref(): void { ref(): void {
this._isSocketUnrefed = false; this.#isSocketUnrefed = false;
this._socket?.ref(); this.#socket?.ref();
} }
unref(): void { unref(): void {
this._isSocketUnrefed = true; this.#isSocketUnrefed = true;
this._socket?.unref(); this.#socket?.unref();
} }
} }

View File

@@ -100,83 +100,83 @@ export default class RedisClusterSlots<
RESP extends RespVersions, RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping TYPE_MAPPING extends TypeMapping
> { > {
private static _SLOTS = 16384; static #SLOTS = 16384;
private readonly _options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING>; readonly #options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING>;
private readonly _clientFactory: ReturnType<typeof RedisClient.factory<M, F, S, RESP>>; readonly #clientFactory: ReturnType<typeof RedisClient.factory<M, F, S, RESP>>;
private readonly _emit: EventEmitter['emit']; readonly #emit: EventEmitter['emit'];
slots = new Array<Shard<M, F, S, RESP, TYPE_MAPPING>>(RedisClusterSlots._SLOTS); slots = new Array<Shard<M, F, S, RESP, TYPE_MAPPING>>(RedisClusterSlots.#SLOTS);
masters = new Array<MasterNode<M, F, S, RESP, TYPE_MAPPING>>(); masters = new Array<MasterNode<M, F, S, RESP, TYPE_MAPPING>>();
replicas = new Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>(); replicas = new Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>(); readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>; pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>;
private _isOpen = false; #isOpen = false;
get isOpen() { get isOpen() {
return this._isOpen; return this.#isOpen;
} }
constructor( constructor(
options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING>, options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING>,
emit: EventEmitter['emit'] emit: EventEmitter['emit']
) { ) {
this._options = options; this.#options = options;
this._clientFactory = RedisClient.factory(options); this.#clientFactory = RedisClient.factory(options);
this._emit = emit; this.#emit = emit;
} }
async connect() { async connect() {
if (this._isOpen) { if (this.#isOpen) {
throw new Error('Cluster already open'); throw new Error('Cluster already open');
} }
this._isOpen = true; this.#isOpen = true;
try { try {
await this._discoverWithRootNodes(); await this.#discoverWithRootNodes();
} catch (err) { } catch (err) {
this._isOpen = false; this.#isOpen = false;
throw err; throw err;
} }
} }
private async _discoverWithRootNodes() { async #discoverWithRootNodes() {
let start = Math.floor(Math.random() * this._options.rootNodes.length); let start = Math.floor(Math.random() * this.#options.rootNodes.length);
for (let i = start; i < this._options.rootNodes.length; i++) { for (let i = start; i < this.#options.rootNodes.length; i++) {
if (!this._isOpen) throw new Error('Cluster closed'); if (!this.#isOpen) throw new Error('Cluster closed');
if (await this._discover(this._options.rootNodes[i])) return; if (await this.#discover(this.#options.rootNodes[i])) return;
} }
for (let i = 0; i < start; i++) { for (let i = 0; i < start; i++) {
if (!this._isOpen) throw new Error('Cluster closed'); if (!this.#isOpen) throw new Error('Cluster closed');
if (await this._discover(this._options.rootNodes[i])) return; if (await this.#discover(this.#options.rootNodes[i])) return;
} }
throw new RootNodesUnavailableError(); throw new RootNodesUnavailableError();
} }
private _resetSlots() { #resetSlots() {
this.slots = new Array(RedisClusterSlots._SLOTS); this.slots = new Array(RedisClusterSlots.#SLOTS);
this.masters = []; this.masters = [];
this.replicas = []; this.replicas = [];
this._randomNodeIterator = undefined; this._randomNodeIterator = undefined;
} }
private async _discover(rootNode: RedisClusterClientOptions) { async #discover(rootNode: RedisClusterClientOptions) {
this._resetSlots(); this.#resetSlots();
try { try {
const addressesInUse = new Set<string>(), const addressesInUse = new Set<string>(),
promises: Array<Promise<unknown>> = [], promises: Array<Promise<unknown>> = [],
eagerConnect = this._options.minimizeConnections !== true; eagerConnect = this.#options.minimizeConnections !== true;
for (const { from, to, master, replicas } of await this._getShards(rootNode)) { for (const { from, to, master, replicas } of await this.#getShards(rootNode)) {
const shard: Shard<M, F, S, RESP, TYPE_MAPPING> = { const shard: Shard<M, F, S, RESP, TYPE_MAPPING> = {
master: this._initiateSlotNode(master, false, eagerConnect, addressesInUse, promises) master: this.#initiateSlotNode(master, false, eagerConnect, addressesInUse, promises)
}; };
if (this._options.useReplicas) { if (this.#options.useReplicas) {
shard.replicas = replicas.map(replica => shard.replicas = replicas.map(replica =>
this._initiateSlotNode(replica, true, eagerConnect, addressesInUse, promises) this.#initiateSlotNode(replica, true, eagerConnect, addressesInUse, promises)
); );
} }
@@ -193,7 +193,7 @@ export default class RedisClusterSlots<
if (channelsListeners.size || patternsListeners.size) { if (channelsListeners.size || patternsListeners.size) {
promises.push( promises.push(
this._initiatePubSubClient({ this.#initiatePubSubClient({
[PubSubType.CHANNELS]: channelsListeners, [PubSubType.CHANNELS]: channelsListeners,
[PubSubType.PATTERNS]: patternsListeners [PubSubType.PATTERNS]: patternsListeners
}) })
@@ -220,21 +220,21 @@ export default class RedisClusterSlots<
return true; return true;
} catch (err) { } catch (err) {
this._emit('error', err); this.#emit('error', err);
return false; return false;
} }
} }
private async _getShards(rootNode: RedisClusterClientOptions) { async #getShards(rootNode: RedisClusterClientOptions) {
const options = this._clientOptionsDefaults(rootNode)!; const options = this.#clientOptionsDefaults(rootNode)!;
options.socket ??= {}; options.socket ??= {};
options.socket.reconnectStrategy = false; options.socket.reconnectStrategy = false;
options.RESP = this._options.RESP; options.RESP = this.#options.RESP;
options.commandOptions = undefined; options.commandOptions = undefined;
// TODO: find a way to avoid type casting // TODO: find a way to avoid type casting
const client = await this._clientFactory(options as RedisClientOptions<M, F, S, RESP, {}>) const client = await this.#clientFactory(options as RedisClientOptions<M, F, S, RESP, {}>)
.on('error', err => this._emit('error', err)) .on('error', err => this.#emit('error', err))
.connect(); .connect();
try { try {
@@ -245,37 +245,37 @@ export default class RedisClusterSlots<
} }
} }
private _getNodeAddress(address: string): NodeAddress | undefined { #getNodeAddress(address: string): NodeAddress | undefined {
switch (typeof this._options.nodeAddressMap) { switch (typeof this.#options.nodeAddressMap) {
case 'object': case 'object':
return this._options.nodeAddressMap[address]; return this.#options.nodeAddressMap[address];
case 'function': case 'function':
return this._options.nodeAddressMap(address); return this.#options.nodeAddressMap(address);
} }
} }
private _clientOptionsDefaults(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) { #clientOptionsDefaults(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
if (!this._options.defaults) return options; if (!this.#options.defaults) return options;
let socket; let socket;
if (this._options.defaults.socket) { if (this.#options.defaults.socket) {
socket = options?.socket ? { socket = options?.socket ? {
...this._options.defaults.socket, ...this.#options.defaults.socket,
...options.socket ...options.socket
} : this._options.defaults.socket; } : this.#options.defaults.socket;
} else { } else {
socket = options?.socket; socket = options?.socket;
} }
return { return {
...this._options.defaults, ...this.#options.defaults,
...options, ...options,
socket socket
}; };
} }
private _initiateSlotNode( #initiateSlotNode(
shard: NodeAddress & { id: string; }, shard: NodeAddress & { id: string; },
readonly: boolean, readonly: boolean,
eagerConnent: boolean, eagerConnent: boolean,
@@ -295,7 +295,7 @@ export default class RedisClusterSlots<
}; };
if (eagerConnent) { if (eagerConnent) {
promises.push(this._createNodeClient(node)); promises.push(this.#createNodeClient(node));
} }
this.nodeByAddress.set(address, node); this.nodeByAddress.set(address, node);
@@ -309,21 +309,21 @@ export default class RedisClusterSlots<
return node; return node;
} }
private _createClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly = node.readonly) { #createClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly = node.readonly) {
return this._clientFactory( return this.#clientFactory(
this._clientOptionsDefaults({ this.#clientOptionsDefaults({
socket: this._getNodeAddress(node.address) ?? { socket: this.#getNodeAddress(node.address) ?? {
host: node.host, host: node.host,
port: node.port port: node.port
}, },
readonly, readonly,
RESP: this._options.RESP RESP: this.#options.RESP
}) })
).on('error', err => console.error(err)); ).on('error', err => console.error(err));
} }
private _createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) { #createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {
const client = node.client = this._createClient(node, readonly); const client = node.client = this.#createClient(node, readonly);
return node.connectPromise = client.connect() return node.connectPromise = client.connect()
.finally(() => node.connectPromise = undefined); .finally(() => node.connectPromise = undefined);
} }
@@ -332,46 +332,46 @@ export default class RedisClusterSlots<
return ( return (
node.connectPromise ?? // if the node is connecting node.connectPromise ?? // if the node is connecting
node.client ?? // if the node is connected node.client ?? // if the node is connected
this._createNodeClient(node) // if the not is disconnected this.#createNodeClient(node) // if the not is disconnected
); );
} }
private _runningRediscoverPromise?: Promise<void>; #runningRediscoverPromise?: Promise<void>;
async rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> { async rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> {
this._runningRediscoverPromise ??= this._rediscover(startWith) this.#runningRediscoverPromise ??= this.#rediscover(startWith)
.finally(() => this._runningRediscoverPromise = undefined); .finally(() => this.#runningRediscoverPromise = undefined);
return this._runningRediscoverPromise; return this.#runningRediscoverPromise;
} }
private async _rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> { async #rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> {
if (await this._discover(startWith.options!)) return; if (await this.#discover(startWith.options!)) return;
return this._discoverWithRootNodes(); return this.#discoverWithRootNodes();
} }
/** /**
* @deprecated Use `close` instead. * @deprecated Use `close` instead.
*/ */
quit(): Promise<void> { quit(): Promise<void> {
return this._destroy(client => client.quit()); return this.#destroy(client => client.quit());
} }
/** /**
* @deprecated Use `destroy` instead. * @deprecated Use `destroy` instead.
*/ */
disconnect(): Promise<void> { disconnect(): Promise<void> {
return this._destroy(client => client.disconnect()); return this.#destroy(client => client.disconnect());
} }
close() { close() {
return this._destroy(client => client.close()); return this.#destroy(client => client.close());
} }
destroy() { destroy() {
this._isOpen = false; this.#isOpen = false;
for (const client of this._clients()) { for (const client of this.#clients()) {
client.destroy(); client.destroy();
} }
@@ -380,11 +380,11 @@ export default class RedisClusterSlots<
this.pubSubNode = undefined; this.pubSubNode = undefined;
} }
this._resetSlots(); this.#resetSlots();
this.nodeByAddress.clear(); this.nodeByAddress.clear();
} }
private *_clients() { *#clients() {
for (const master of this.masters) { for (const master of this.masters) {
if (master.client) { if (master.client) {
yield master.client; yield master.client;
@@ -402,11 +402,11 @@ export default class RedisClusterSlots<
} }
} }
private async _destroy(fn: (client: RedisClientType<M, F, S, RESP>) => Promise<unknown>): Promise<void> { async #destroy(fn: (client: RedisClientType<M, F, S, RESP>) => Promise<unknown>): Promise<void> {
this._isOpen = false; this.#isOpen = false;
const promises = []; const promises = [];
for (const client of this._clients()) { for (const client of this.#clients()) {
promises.push(fn(client)); promises.push(fn(client));
} }
@@ -415,7 +415,7 @@ export default class RedisClusterSlots<
this.pubSubNode = undefined; this.pubSubNode = undefined;
} }
this._resetSlots(); this.#resetSlots();
this.nodeByAddress.clear(); this.nodeByAddress.clear();
await Promise.allSettled(promises); await Promise.allSettled(promises);
@@ -437,7 +437,7 @@ export default class RedisClusterSlots<
return this.nodeClient(this.getSlotRandomNode(slotNumber)); return this.nodeClient(this.getSlotRandomNode(slotNumber));
} }
private *_iterateAllNodes() { *#iterateAllNodes() {
let i = Math.floor(Math.random() * (this.masters.length + this.replicas.length)); let i = Math.floor(Math.random() * (this.masters.length + this.replicas.length));
if (i < this.masters.length) { if (i < this.masters.length) {
do { do {
@@ -468,11 +468,11 @@ export default class RedisClusterSlots<
_randomNodeIterator?: IterableIterator<ShardNode<M, F, S, RESP, TYPE_MAPPING>>; _randomNodeIterator?: IterableIterator<ShardNode<M, F, S, RESP, TYPE_MAPPING>>;
getRandomNode() { getRandomNode() {
this._randomNodeIterator ??= this._iterateAllNodes(); this._randomNodeIterator ??= this.#iterateAllNodes();
return this._randomNodeIterator.next().value as ShardNode<M, F, S, RESP, TYPE_MAPPING>; return this._randomNodeIterator.next().value as ShardNode<M, F, S, RESP, TYPE_MAPPING>;
} }
private *_slotNodesIterator(slot: ShardWithReplicas<M, F, S, RESP, TYPE_MAPPING>) { *#slotNodesIterator(slot: ShardWithReplicas<M, F, S, RESP, TYPE_MAPPING>) {
let i = Math.floor(Math.random() * (1 + slot.replicas.length)); let i = Math.floor(Math.random() * (1 + slot.replicas.length));
if (i < slot.replicas.length) { if (i < slot.replicas.length) {
do { do {
@@ -495,7 +495,7 @@ export default class RedisClusterSlots<
return slot.master; return slot.master;
} }
slot.nodesIterator ??= this._slotNodesIterator(slot as ShardWithReplicas<M, F, S, RESP, TYPE_MAPPING>); slot.nodesIterator ??= this.#slotNodesIterator(slot as ShardWithReplicas<M, F, S, RESP, TYPE_MAPPING>);
return slot.nodesIterator.next().value as ShardNode<M, F, S, RESP, TYPE_MAPPING>; return slot.nodesIterator.next().value as ShardNode<M, F, S, RESP, TYPE_MAPPING>;
} }
@@ -507,17 +507,17 @@ export default class RedisClusterSlots<
} }
getPubSubClient() { getPubSubClient() {
if (!this.pubSubNode) return this._initiatePubSubClient(); if (!this.pubSubNode) return this.#initiatePubSubClient();
return this.pubSubNode.connectPromise ?? this.pubSubNode.client; return this.pubSubNode.connectPromise ?? this.pubSubNode.client;
} }
private async _initiatePubSubClient(toResubscribe?: PubSubToResubscribe) { async #initiatePubSubClient(toResubscribe?: PubSubToResubscribe) {
const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)), const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)),
node = index < this.masters.length ? node = index < this.masters.length ?
this.masters[index] : this.masters[index] :
this.replicas[index - this.masters.length], this.replicas[index - this.masters.length],
client = this._createClient(node, true); client = this.#createClient(node, true);
this.pubSubNode = { this.pubSubNode = {
address: node.address, address: node.address,
@@ -557,12 +557,12 @@ export default class RedisClusterSlots<
getShardedPubSubClient(channel: string) { getShardedPubSubClient(channel: string) {
const { master } = this.slots[calculateSlot(channel)]; const { master } = this.slots[calculateSlot(channel)];
if (!master.pubSub) return this._initiateShardedPubSubClient(master); if (!master.pubSub) return this.#initiateShardedPubSubClient(master);
return master.pubSub.connectPromise ?? master.pubSub.client; return master.pubSub.connectPromise ?? master.pubSub.client;
} }
private async _initiateShardedPubSubClient(master: MasterNode<M, F, S, RESP, TYPE_MAPPING>) { async #initiateShardedPubSubClient(master: MasterNode<M, F, S, RESP, TYPE_MAPPING>) {
const client = this._createClient(master, true) const client = this.#createClient(master, true)
.on('server-sunsubscribe', async (channel, listeners) => { .on('server-sunsubscribe', async (channel, listeners) => {
try { try {
await this.rediscover(client); await this.rediscover(client);
@@ -573,7 +573,7 @@ export default class RedisClusterSlots<
listeners listeners
); );
} catch (err) { } catch (err) {
this._emit('sharded-shannel-moved-error', err, channel, listeners); this.#emit('sharded-shannel-moved-error', err, channel, listeners);
} }
}); });

View File

@@ -132,7 +132,7 @@ export interface ClusterCommandOptions<
type ProxyCluster = RedisCluster<any, any, any, any, any/*, any*/>; type ProxyCluster = RedisCluster<any, any, any, any, any/*, any*/>;
type NamespaceProxyCluster = { self: ProxyCluster }; type NamespaceProxyCluster = { _self: ProxyCluster };
export default class RedisCluster< export default class RedisCluster<
M extends RedisModules, M extends RedisModules,
@@ -166,7 +166,7 @@ export default class RedisCluster<
return key; return key;
} }
private static _createCommand(command: Command, resp: RespVersions) { static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return async function (this: ProxyCluster, ...args: Array<unknown>) { return async function (this: ProxyCluster, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args), const redisArgs = command.transformArguments(...args),
@@ -189,7 +189,7 @@ export default class RedisCluster<
}; };
} }
private static _createModuleCommand(command: Command, resp: RespVersions) { static #createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return async function (this: NamespaceProxyCluster, ...args: Array<unknown>) { return async function (this: NamespaceProxyCluster, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args), const redisArgs = command.transformArguments(...args),
@@ -198,11 +198,11 @@ export default class RedisCluster<
args, args,
redisArgs redisArgs
), ),
reply = await this.self.sendCommand( reply = await this._self.sendCommand(
firstKey, firstKey,
command.IS_READ_ONLY, command.IS_READ_ONLY,
redisArgs, redisArgs,
this.self._commandOptions, this._self._commandOptions,
// command.POLICIES // command.POLICIES
); );
@@ -212,7 +212,7 @@ export default class RedisCluster<
}; };
} }
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn), const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp); transformReply = getTransformReply(fn, resp);
return async function (this: NamespaceProxyCluster, ...args: Array<unknown>) { return async function (this: NamespaceProxyCluster, ...args: Array<unknown>) {
@@ -223,11 +223,11 @@ export default class RedisCluster<
fnArgs fnArgs
), ),
redisArgs = prefix.concat(fnArgs), redisArgs = prefix.concat(fnArgs),
reply = await this.self.sendCommand( reply = await this._self.sendCommand(
firstKey, firstKey,
fn.IS_READ_ONLY, fn.IS_READ_ONLY,
redisArgs, redisArgs,
this.self._commandOptions, this._self._commandOptions,
// fn.POLICIES // fn.POLICIES
); );
@@ -237,7 +237,7 @@ export default class RedisCluster<
}; };
} }
private static _createScriptCommand(script: RedisScript, resp: RespVersions) { static #createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script), const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp); transformReply = getTransformReply(script, resp);
return async function (this: ProxyCluster, ...args: Array<unknown>) { return async function (this: ProxyCluster, ...args: Array<unknown>) {
@@ -274,17 +274,17 @@ export default class RedisCluster<
const Cluster = attachConfig({ const Cluster = attachConfig({
BaseClass: RedisCluster, BaseClass: RedisCluster,
commands: COMMANDS, commands: COMMANDS,
createCommand: RedisCluster._createCommand, createCommand: RedisCluster.#createCommand,
createModuleCommand: RedisCluster._createModuleCommand, createModuleCommand: RedisCluster.#createModuleCommand,
createFunctionCommand: RedisCluster._createFunctionCommand, createFunctionCommand: RedisCluster.#createFunctionCommand,
createScriptCommand: RedisCluster._createScriptCommand, createScriptCommand: RedisCluster.#createScriptCommand,
config config
}); });
Cluster.prototype.Multi = RedisClusterMultiCommand.extend(config); Cluster.prototype.Multi = RedisClusterMultiCommand.extend(config);
return (options?: Omit<RedisClusterOptions, keyof Exclude<typeof config, undefined>>) => { return (options?: Omit<RedisClusterOptions, keyof Exclude<typeof config, undefined>>) => {
// returning a "proxy" to prevent the namespaces.self to leak between "proxies" // returning a "proxy" to prevent the namespaces._self to leak between "proxies"
return Object.create(new Cluster(options)) as RedisClusterType<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>; return Object.create(new Cluster(options)) as RedisClusterType<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>;
}; };
} }
@@ -300,10 +300,11 @@ export default class RedisCluster<
return RedisCluster.factory(options)(options); return RedisCluster.factory(options)(options);
} }
private readonly _options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>; readonly #options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>;
private readonly _slots: RedisClusterSlots<M, F, S, RESP, TYPE_MAPPING>; readonly #slots: RedisClusterSlots<M, F, S, RESP, TYPE_MAPPING>;
private _self = this;
private _commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>; private _commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>;
/** /**
@@ -311,7 +312,7 @@ export default class RedisCluster<
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica). * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica).
*/ */
get slots() { get slots() {
return this._slots.slots; return this._self.#slots.slots;
} }
/** /**
@@ -319,7 +320,7 @@ export default class RedisCluster<
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node.
*/ */
get masters() { get masters() {
return this._slots.masters; return this._self.#slots.masters;
} }
/** /**
@@ -327,7 +328,7 @@ export default class RedisCluster<
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific replica node. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific replica node.
*/ */
get replicas() { get replicas() {
return this._slots.replicas; return this._self.#slots.replicas;
} }
/** /**
@@ -335,25 +336,25 @@ export default class RedisCluster<
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica). * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica).
*/ */
get nodeByAddress() { get nodeByAddress() {
return this._slots.nodeByAddress; return this._self.#slots.nodeByAddress;
} }
/** /**
* The current pub/sub node. * The current pub/sub node.
*/ */
get pubSubNode() { get pubSubNode() {
return this._slots.pubSubNode; return this._self.#slots.pubSubNode;
} }
get isOpen() { get isOpen() {
return this._slots.isOpen; return this._self.#slots.isOpen;
} }
constructor(options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>) { constructor(options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>) {
super(); super();
this._options = options; this.#options = options;
this._slots = new RedisClusterSlots(options, this.emit.bind(this)); this.#slots = new RedisClusterSlots(options, this.emit.bind(this));
if (options?.commandOptions) { if (options?.commandOptions) {
this._commandOptions = options.commandOptions; this._commandOptions = options.commandOptions;
@@ -368,14 +369,14 @@ export default class RedisCluster<
_TYPE_MAPPING extends TypeMapping = TYPE_MAPPING _TYPE_MAPPING extends TypeMapping = TYPE_MAPPING
>(overrides?: Partial<RedisClusterOptions<_M, _F, _S, _RESP, _TYPE_MAPPING>>) { >(overrides?: Partial<RedisClusterOptions<_M, _F, _S, _RESP, _TYPE_MAPPING>>) {
return new (Object.getPrototypeOf(this).constructor)({ return new (Object.getPrototypeOf(this).constructor)({
...this._options, ...this._self.#options,
commandOptions: this._commandOptions, commandOptions: this._commandOptions,
...overrides ...overrides
}) as RedisClusterType<_M, _F, _S, _RESP, _TYPE_MAPPING>; }) as RedisClusterType<_M, _F, _S, _RESP, _TYPE_MAPPING>;
} }
connect() { connect() {
return this._slots.connect(); return this._self.#slots.connect();
} }
withCommandOptions< withCommandOptions<
@@ -430,13 +431,13 @@ export default class RedisCluster<
// return this._commandOptionsProxy('policies', policies); // return this._commandOptionsProxy('policies', policies);
// } // }
private async _execute<T>( async #execute<T>(
firstKey: RedisArgument | undefined, firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined, isReadonly: boolean | undefined,
fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>) => Promise<T> fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>) => Promise<T>
): Promise<T> { ): Promise<T> {
const maxCommandRedirections = this._options.maxCommandRedirections ?? 16; const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16;
let client = await this._slots.getClient(firstKey, isReadonly), let client = await this.#slots.getClient(firstKey, isReadonly),
i = 0; i = 0;
while (true) { while (true) {
try { try {
@@ -449,10 +450,10 @@ export default class RedisCluster<
if (err.message.startsWith('ASK')) { if (err.message.startsWith('ASK')) {
const address = err.message.substring(err.message.lastIndexOf(' ') + 1); const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
let redirectTo = await this._slots.getMasterByAddress(address); let redirectTo = await this.#slots.getMasterByAddress(address);
if (!redirectTo) { if (!redirectTo) {
await this._slots.rediscover(client); await this.#slots.rediscover(client);
redirectTo = await this._slots.getMasterByAddress(address); redirectTo = await this.#slots.getMasterByAddress(address);
} }
if (!redirectTo) { if (!redirectTo) {
@@ -465,8 +466,8 @@ export default class RedisCluster<
} }
if (err.message.startsWith('MOVED')) { if (err.message.startsWith('MOVED')) {
await this._slots.rediscover(client); await this.#slots.rediscover(client);
client = await this._slots.getClient(firstKey, isReadonly); client = await this.#slots.getClient(firstKey, isReadonly);
continue; continue;
} }
@@ -482,7 +483,7 @@ export default class RedisCluster<
options?: ClusterCommandOptions, options?: ClusterCommandOptions,
// defaultPolicies?: CommandPolicies // defaultPolicies?: CommandPolicies
): Promise<T> { ): Promise<T> {
return this._execute( return this._self.#execute(
firstKey, firstKey,
isReadonly, isReadonly,
client => client.sendCommand(args, options) client => client.sendCommand(args, options)
@@ -496,7 +497,7 @@ export default class RedisCluster<
args: Array<RedisArgument>, args: Array<RedisArgument>,
options?: CommandOptions options?: CommandOptions
) { ) {
return this._execute( return this._self.#execute(
firstKey, firstKey,
isReadonly, isReadonly,
client => client.executeScript(script, args, options) client => client.executeScript(script, args, options)
@@ -507,11 +508,11 @@ export default class RedisCluster<
type Multi = new (...args: ConstructorParameters<typeof RedisClusterMultiCommand>) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>; type Multi = new (...args: ConstructorParameters<typeof RedisClusterMultiCommand>) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>;
return new ((this as any).Multi as Multi)( return new ((this as any).Multi as Multi)(
async (firstKey, isReadonly, commands) => { async (firstKey, isReadonly, commands) => {
const client = await this._slots.getClient(firstKey, isReadonly); const client = await this._self.#slots.getClient(firstKey, isReadonly);
return client._executeMulti(commands); return client._executeMulti(commands);
}, },
async (firstKey, isReadonly, commands) => { async (firstKey, isReadonly, commands) => {
const client = await this._slots.getClient(firstKey, isReadonly); const client = await this._self.#slots.getClient(firstKey, isReadonly);
return client._executePipeline(commands); return client._executePipeline(commands);
}, },
routing routing
@@ -525,7 +526,7 @@ export default class RedisCluster<
listener: PubSubListener<T>, listener: PubSubListener<T>,
bufferMode?: T bufferMode?: T
) { ) {
return (await this._slots.getPubSubClient()) return (await this._self.#slots.getPubSubClient())
.SUBSCRIBE(channels, listener, bufferMode); .SUBSCRIBE(channels, listener, bufferMode);
} }
@@ -536,7 +537,7 @@ export default class RedisCluster<
listener?: PubSubListener<boolean>, listener?: PubSubListener<boolean>,
bufferMode?: T bufferMode?: T
) { ) {
return this._slots.executeUnsubscribeCommand(client => return this._self.#slots.executeUnsubscribeCommand(client =>
client.UNSUBSCRIBE(channels, listener, bufferMode) client.UNSUBSCRIBE(channels, listener, bufferMode)
); );
} }
@@ -548,7 +549,7 @@ export default class RedisCluster<
listener: PubSubListener<T>, listener: PubSubListener<T>,
bufferMode?: T bufferMode?: T
) { ) {
return (await this._slots.getPubSubClient()) return (await this._self.#slots.getPubSubClient())
.PSUBSCRIBE(patterns, listener, bufferMode); .PSUBSCRIBE(patterns, listener, bufferMode);
} }
@@ -559,7 +560,7 @@ export default class RedisCluster<
listener?: PubSubListener<T>, listener?: PubSubListener<T>,
bufferMode?: T bufferMode?: T
) { ) {
return this._slots.executeUnsubscribeCommand(client => return this._self.#slots.executeUnsubscribeCommand(client =>
client.PUNSUBSCRIBE(patterns, listener, bufferMode) client.PUNSUBSCRIBE(patterns, listener, bufferMode)
); );
} }
@@ -571,9 +572,9 @@ export default class RedisCluster<
listener: PubSubListener<T>, listener: PubSubListener<T>,
bufferMode?: T bufferMode?: T
) { ) {
const maxCommandRedirections = this._options.maxCommandRedirections ?? 16, const maxCommandRedirections = this._self.#options.maxCommandRedirections ?? 16,
firstChannel = Array.isArray(channels) ? channels[0] : channels; firstChannel = Array.isArray(channels) ? channels[0] : channels;
let client = await this._slots.getShardedPubSubClient(firstChannel); let client = await this._self.#slots.getShardedPubSubClient(firstChannel);
for (let i = 0; ; i++) { for (let i = 0; ; i++) {
try { try {
return await client.SSUBSCRIBE(channels, listener, bufferMode); return await client.SSUBSCRIBE(channels, listener, bufferMode);
@@ -583,8 +584,8 @@ export default class RedisCluster<
} }
if (err.message.startsWith('MOVED')) { if (err.message.startsWith('MOVED')) {
await this._slots.rediscover(client); await this._self.#slots.rediscover(client);
client = await this._slots.getShardedPubSubClient(firstChannel); client = await this._self.#slots.getShardedPubSubClient(firstChannel);
continue; continue;
} }
@@ -600,7 +601,7 @@ export default class RedisCluster<
listener: PubSubListener<T>, listener: PubSubListener<T>,
bufferMode?: T bufferMode?: T
) { ) {
return this._slots.executeShardedUnsubscribeCommand( return this._self.#slots.executeShardedUnsubscribeCommand(
Array.isArray(channels) ? channels[0] : channels, Array.isArray(channels) ? channels[0] : channels,
client => client.SUNSUBSCRIBE(channels, listener, bufferMode) client => client.SUNSUBSCRIBE(channels, listener, bufferMode)
); );
@@ -612,26 +613,26 @@ export default class RedisCluster<
* @deprecated Use `close` instead. * @deprecated Use `close` instead.
*/ */
quit() { quit() {
return this._slots.quit(); return this._self.#slots.quit();
} }
/** /**
* @deprecated Use `destroy` instead. * @deprecated Use `destroy` instead.
*/ */
disconnect() { disconnect() {
return this._slots.disconnect(); return this._self.#slots.disconnect();
} }
close() { close() {
return this._slots.close(); return this._self.#slots.close();
} }
destroy() { destroy() {
return this._slots.destroy(); return this._self.#slots.destroy();
} }
nodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>) { nodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>) {
return this._slots.nodeClient(node); return this._self.#slots.nodeClient(node);
} }
/** /**
@@ -639,7 +640,7 @@ export default class RedisCluster<
* Userful for running "forward" commands (like PUBLISH) on a random node. * Userful for running "forward" commands (like PUBLISH) on a random node.
*/ */
getRandomNode() { getRandomNode() {
return this._slots.getRandomNode(); return this._self.#slots.getRandomNode();
} }
/** /**
@@ -647,7 +648,7 @@ export default class RedisCluster<
* Useful for running readonly commands on a slot. * Useful for running readonly commands on a slot.
*/ */
getSlotRandomNode(slot: number) { getSlotRandomNode(slot: number) {
return this._slots.getSlotRandomNode(slot); return this._self.#slots.getSlotRandomNode(slot);
} }
/** /**

View File

@@ -91,7 +91,7 @@ export type ClusterMultiExecute = (
) => Promise<Array<unknown>>; ) => Promise<Array<unknown>>;
export default class RedisClusterMultiCommand<REPLIES = []> { export default class RedisClusterMultiCommand<REPLIES = []> {
private static _createCommand(command: Command, resp: RespVersions) { static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return function (this: RedisClusterMultiCommand, ...args: Array<unknown>) { return function (this: RedisClusterMultiCommand, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args), const redisArgs = command.transformArguments(...args),
@@ -109,16 +109,16 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
}; };
} }
private static _createModuleCommand(command: Command, resp: RespVersions) { static #createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return function (this: { self: RedisClusterMultiCommand }, ...args: Array<unknown>) { return function (this: { _self: RedisClusterMultiCommand }, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args), const redisArgs = command.transformArguments(...args),
firstKey = RedisCluster.extractFirstKey( firstKey = RedisCluster.extractFirstKey(
command, command,
args, args,
redisArgs redisArgs
); );
return this.self.addCommand( return this._self.addCommand(
firstKey, firstKey,
command.IS_READ_ONLY, command.IS_READ_ONLY,
redisArgs, redisArgs,
@@ -127,10 +127,10 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
}; };
} }
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn), const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp); transformReply = getTransformReply(fn, resp);
return function (this: { self: RedisClusterMultiCommand }, ...args: Array<unknown>) { return function (this: { _self: RedisClusterMultiCommand }, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args), const fnArgs = fn.transformArguments(...args),
redisArgs: CommandArguments = prefix.concat(fnArgs), redisArgs: CommandArguments = prefix.concat(fnArgs),
firstKey = RedisCluster.extractFirstKey( firstKey = RedisCluster.extractFirstKey(
@@ -139,7 +139,7 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
fnArgs fnArgs
); );
redisArgs.preserve = fnArgs.preserve; redisArgs.preserve = fnArgs.preserve;
return this.self.addCommand( return this._self.addCommand(
firstKey, firstKey,
fn.IS_READ_ONLY, fn.IS_READ_ONLY,
redisArgs, redisArgs,
@@ -148,11 +148,11 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
}; };
} }
private static _createScriptCommand(script: RedisScript, resp: RespVersions) { static #createScriptCommand(script: RedisScript, resp: RespVersions) {
const transformReply = getTransformReply(script, resp); const transformReply = getTransformReply(script, resp);
return function (this: RedisClusterMultiCommand, ...args: Array<unknown>) { return function (this: RedisClusterMultiCommand, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args); const scriptArgs = script.transformArguments(...args);
this._setState( this.#setState(
RedisCluster.extractFirstKey( RedisCluster.extractFirstKey(
script, script,
args, args,
@@ -160,7 +160,7 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
), ),
script.IS_READ_ONLY script.IS_READ_ONLY
); );
this._multi.addScript( this.#multi.addScript(
script, script,
scriptArgs, scriptArgs,
transformReply transformReply
@@ -178,36 +178,36 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
return attachConfig({ return attachConfig({
BaseClass: RedisClusterMultiCommand, BaseClass: RedisClusterMultiCommand,
commands: COMMANDS, commands: COMMANDS,
createCommand: RedisClusterMultiCommand._createCommand, createCommand: RedisClusterMultiCommand.#createCommand,
createModuleCommand: RedisClusterMultiCommand._createModuleCommand, createModuleCommand: RedisClusterMultiCommand.#createModuleCommand,
createFunctionCommand: RedisClusterMultiCommand._createFunctionCommand, createFunctionCommand: RedisClusterMultiCommand.#createFunctionCommand,
createScriptCommand: RedisClusterMultiCommand._createScriptCommand, createScriptCommand: RedisClusterMultiCommand.#createScriptCommand,
config config
}); });
} }
private readonly _multi = new RedisMultiCommand(); readonly #multi = new RedisMultiCommand();
private readonly _executeMulti: ClusterMultiExecute; readonly #executeMulti: ClusterMultiExecute;
private readonly _executePipeline: ClusterMultiExecute; readonly #executePipeline: ClusterMultiExecute;
private _firstKey: RedisArgument | undefined; #firstKey: RedisArgument | undefined;
private _isReadonly: boolean | undefined = true; #isReadonly: boolean | undefined = true;
constructor( constructor(
executeMulti: ClusterMultiExecute, executeMulti: ClusterMultiExecute,
executePipeline: ClusterMultiExecute, executePipeline: ClusterMultiExecute,
routing: RedisArgument | undefined routing: RedisArgument | undefined
) { ) {
this._executeMulti = executeMulti; this.#executeMulti = executeMulti;
this._executePipeline = executePipeline; this.#executePipeline = executePipeline;
this._firstKey = routing; this.#firstKey = routing;
} }
private _setState( #setState(
firstKey: RedisArgument | undefined, firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined, isReadonly: boolean | undefined,
) { ) {
this._firstKey ??= firstKey; this.#firstKey ??= firstKey;
this._isReadonly &&= isReadonly; this.#isReadonly &&= isReadonly;
} }
addCommand( addCommand(
@@ -216,19 +216,19 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
args: CommandArguments, args: CommandArguments,
transformReply?: TransformReply transformReply?: TransformReply
) { ) {
this._setState(firstKey, isReadonly); this.#setState(firstKey, isReadonly);
this._multi.addCommand(args, transformReply); this.#multi.addCommand(args, transformReply);
return this; return this;
} }
async exec<T extends MultiReply = MULTI_REPLY['GENERIC']>(execAsPipeline = false) { async exec<T extends MultiReply = MULTI_REPLY['GENERIC']>(execAsPipeline = false) {
if (execAsPipeline) return this.execAsPipeline<T>(); if (execAsPipeline) return this.execAsPipeline<T>();
return this._multi.transformReplies( return this.#multi.transformReplies(
await this._executeMulti( await this.#executeMulti(
this._firstKey, this.#firstKey,
this._isReadonly, this.#isReadonly,
this._multi.queue this.#multi.queue
) )
) as MultiReplyType<T, REPLIES>; ) as MultiReplyType<T, REPLIES>;
} }
@@ -240,13 +240,13 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
} }
async execAsPipeline<T extends MultiReply = MULTI_REPLY['GENERIC']>() { async execAsPipeline<T extends MultiReply = MULTI_REPLY['GENERIC']>() {
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>; if (this.#multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
return this._multi.transformReplies( return this.#multi.transformReplies(
await this._executePipeline( await this.#executePipeline(
this._firstKey, this.#firstKey,
this._isReadonly, this.#isReadonly,
this._multi.queue this.#multi.queue
) )
) as MultiReplyType<T, REPLIES>; ) as MultiReplyType<T, REPLIES>;
} }

View File

@@ -71,7 +71,7 @@ function attachNamespace(prototype: any, name: PropertyKey, fns: any) {
Object.defineProperty(prototype, name, { Object.defineProperty(prototype, name, {
get() { get() {
const value = Object.create(fns); const value = Object.create(fns);
value.self = this; value._self = this;
Object.defineProperty(this, name, { value }); Object.defineProperty(this, name, { value });
return value; return value;
} }