diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 22ea1aba96..6261b5dd19 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -44,18 +44,18 @@ const RESP2_PUSH_TYPE_MAPPING = { }; export default class RedisCommandsQueue { - private readonly _maxLength: number | null | undefined; - private readonly _toWrite = new DoublyLinkedList(); - private readonly _waitingForReply = new SinglyLinkedList(); - private readonly _onShardedChannelMoved: OnShardedChannelMoved; + readonly #maxLength: number | null | undefined; + readonly #toWrite = new DoublyLinkedList(); + readonly #waitingForReply = new SinglyLinkedList(); + readonly #onShardedChannelMoved: OnShardedChannelMoved; - private readonly _pubSub = new PubSub(); + readonly #pubSub = new PubSub(); get isPubSubActive() { - return this._pubSub.isActive; + return this.#pubSub.isActive; } - private _chainInExecution: symbol | undefined; + #chainInExecution: symbol | undefined; decoder: Decoder; @@ -64,92 +64,92 @@ export default class RedisCommandsQueue { maxLength: number | null | undefined, onShardedChannelMoved: EventEmitter['emit'] ) { - this.decoder = this._initiateDecoder(respVersion); - this._maxLength = maxLength; - this._onShardedChannelMoved = onShardedChannelMoved; + this.decoder = this.#initiateDecoder(respVersion); + this.#maxLength = maxLength; + this.#onShardedChannelMoved = onShardedChannelMoved; } - private _initiateDecoder(respVersion: RespVersions | null | undefined) { + #initiateDecoder(respVersion: RespVersions | null | undefined) { return respVersion === 3 ? - this._initiateResp3Decoder() : - this._initiateResp2Decoder(); + this.#initiateResp3Decoder() : + this.#initiateResp2Decoder(); } - private _onReply(reply: ReplyUnion) { - this._waitingForReply.shift()!.resolve(reply); + #onReply(reply: ReplyUnion) { + this.#waitingForReply.shift()!.resolve(reply); } - private _onErrorReply(err: ErrorReply) { - this._waitingForReply.shift()!.reject(err); + #onErrorReply(err: ErrorReply) { + this.#waitingForReply.shift()!.reject(err); } - private _onPush(push: Array) { + #onPush(push: Array) { // TODO: type - if (this._pubSub.handleMessageReply(push)) return true; + if (this.#pubSub.handleMessageReply(push)) return true; const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push); - if (isShardedUnsubscribe && !this._waitingForReply.length) { + if (isShardedUnsubscribe && !this.#waitingForReply.length) { const channel = push[1].toString(); - this._onShardedChannelMoved( + this.#onShardedChannelMoved( channel, - this._pubSub.removeShardedListeners(channel) + this.#pubSub.removeShardedListeners(channel) ); return true; } else if (isShardedUnsubscribe || PubSub.isStatusReply(push)) { - const head = this._waitingForReply.head!.value; + const head = this.#waitingForReply.head!.value; if ( (Number.isNaN(head.channelsCounter!) && push[2] === 0) || --head.channelsCounter! === 0 ) { - this._waitingForReply.shift()!.resolve(); + this.#waitingForReply.shift()!.resolve(); } return true; } } - private _getTypeMapping() { - return this._waitingForReply.head!.value.typeMapping ?? {}; + #getTypeMapping() { + return this.#waitingForReply.head!.value.typeMapping ?? {}; } - private _initiateResp3Decoder() { + #initiateResp3Decoder() { return new Decoder({ - onReply: reply => this._onReply(reply), - onErrorReply: err => this._onErrorReply(err), + onReply: reply => this.#onReply(reply), + onErrorReply: err => this.#onErrorReply(err), onPush: push => { - if (!this._onPush(push)) { + if (!this.#onPush(push)) { } }, - getTypeMapping: () => this._getTypeMapping() + getTypeMapping: () => this.#getTypeMapping() }); } - private _initiateResp2Decoder() { + #initiateResp2Decoder() { return new Decoder({ onReply: reply => { - if (this._pubSub.isActive && Array.isArray(reply)) { - if (this._onPush(reply)) return; + if (this.#pubSub.isActive && Array.isArray(reply)) { + if (this.#onPush(reply)) return; if (PONG.equals(reply[0] as Buffer)) { - const { resolve, typeMapping } = this._waitingForReply.shift()!, + const { resolve, typeMapping } = this.#waitingForReply.shift()!, buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer; resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString()); return; } } - this._onReply(reply); + this.#onReply(reply); }, - onErrorReply: err => this._onErrorReply(err), + onErrorReply: err => this.#onErrorReply(err), // PUSH type does not exist in RESP2 // PubSub is handled in onReply // @ts-expect-error onPush: undefined, getTypeMapping: () => { // PubSub push is an Array in RESP2 - return this._pubSub.isActive ? + return this.#pubSub.isActive ? RESP2_PUSH_TYPE_MAPPING : - this._getTypeMapping(); + this.#getTypeMapping(); } }); } @@ -180,7 +180,7 @@ export default class RedisCommandsQueue { options?: CommandOptions, resolveOnWrite?: boolean ): Promise { - if (this._maxLength && this._toWrite.length + this._waitingForReply.length >= this._maxLength) { + if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) { return Promise.reject(new Error('The queue is full')); } else if (options?.abortSignal?.aborted) { return Promise.reject(new AbortError()); @@ -204,14 +204,14 @@ export default class RedisCommandsQueue { value.abort = { signal, listener: () => { - this._toWrite.remove(node); + this.#toWrite.remove(node); value.reject(new AbortError()); } }; signal.addEventListener('abort', value.abort.listener, { once: true }); } - node = this._toWrite.add(value, options?.asap); + node = this.#toWrite.add(value, options?.asap); }); } @@ -221,8 +221,8 @@ export default class RedisCommandsQueue { listener: PubSubListener, returnBuffers?: T ) { - return this._addPubSubCommand( - this._pubSub.subscribe(type, channels, listener, returnBuffers) + return this.#addPubSubCommand( + this.#pubSub.subscribe(type, channels, listener, returnBuffers) ); } @@ -232,17 +232,17 @@ export default class RedisCommandsQueue { listener?: PubSubListener, returnBuffers?: T ) { - return this._addPubSubCommand( - this._pubSub.unsubscribe(type, channels, listener, returnBuffers) + return this.#addPubSubCommand( + this.#pubSub.unsubscribe(type, channels, listener, returnBuffers) ); } resubscribe(): Promise | undefined { - const commands = this._pubSub.resubscribe(); + const commands = this.#pubSub.resubscribe(); if (!commands.length) return; return Promise.all( - commands.map(command => this._addPubSubCommand(command, true)) + commands.map(command => this.#addPubSubCommand(command, true)) ); } @@ -251,26 +251,26 @@ export default class RedisCommandsQueue { channel: string, listeners: ChannelListeners ) { - return this._addPubSubCommand( - this._pubSub.extendChannelListeners(type, channel, listeners) + return this.#addPubSubCommand( + this.#pubSub.extendChannelListeners(type, channel, listeners) ); } extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) { - return this._addPubSubCommand( - this._pubSub.extendTypeListeners(type, listeners) + return this.#addPubSubCommand( + this.#pubSub.extendTypeListeners(type, listeners) ); } getPubSubListeners(type: PubSubType) { - return this._pubSub.getTypeListeners(type); + return this.#pubSub.getTypeListeners(type); } - private _addPubSubCommand(command: PubSubCommand, asap = false) { + #addPubSubCommand(command: PubSubCommand, asap = false) { if (command === undefined) return; return new Promise((resolve, reject) => { - this._toWrite.add({ + this.#toWrite.add({ args: command.args, chainId: undefined, abort: undefined, @@ -290,23 +290,23 @@ export default class RedisCommandsQueue { } isWaitingToWrite() { - return this._toWrite.length > 0; + return this.#toWrite.length > 0; } *commandsToWrite() { - let toSend = this._toWrite.shift(); + let toSend = this.#toWrite.shift(); while (toSend) { let encoded: CommandArguments; try { encoded = encodeCommand(toSend.args); } catch (err) { toSend.reject(err); - toSend = this._toWrite.shift(); + toSend = this.#toWrite.shift(); continue; } if (toSend.abort) { - RedisCommandsQueue._removeAbortListener(toSend); + RedisCommandsQueue.#removeAbortListener(toSend); toSend.abort = undefined; } @@ -316,31 +316,31 @@ export default class RedisCommandsQueue { // TODO reuse `toSend` or create new object? (toSend as any).args = undefined; - this._chainInExecution = toSend.chainId; + this.#chainInExecution = toSend.chainId; toSend.chainId = undefined; - this._waitingForReply.push(toSend); + this.#waitingForReply.push(toSend); } yield encoded; - toSend = this._toWrite.shift(); + toSend = this.#toWrite.shift(); } } - private _flushWaitingForReply(err: Error): void { - for (const node of this._waitingForReply) { + #flushWaitingForReply(err: Error): void { + for (const node of this.#waitingForReply) { node.reject(err); } - this._waitingForReply.reset(); + this.#waitingForReply.reset(); } - private static _removeAbortListener(command: CommandToWrite) { + static #removeAbortListener(command: CommandToWrite) { command.abort!.signal.removeEventListener('abort', command.abort!.listener); } - private static _flushToWrite(toBeSent: CommandToWrite, err: Error) { + static #flushToWrite(toBeSent: CommandToWrite, err: Error) { if (toBeSent.abort) { - RedisCommandsQueue._removeAbortListener(toBeSent); + RedisCommandsQueue.#removeAbortListener(toBeSent); } toBeSent.reject(err); @@ -348,36 +348,36 @@ export default class RedisCommandsQueue { flushWaitingForReply(err: Error): void { this.decoder.reset(); - this._pubSub.reset(); + this.#pubSub.reset(); - this._flushWaitingForReply(err); + this.#flushWaitingForReply(err); - if (!this._chainInExecution) return; + if (!this.#chainInExecution) return; - while (this._toWrite.head?.value.chainId === this._chainInExecution) { - RedisCommandsQueue._flushToWrite( - this._toWrite.shift()!, + while (this.#toWrite.head?.value.chainId === this.#chainInExecution) { + RedisCommandsQueue.#flushToWrite( + this.#toWrite.shift()!, err ); } - this._chainInExecution = undefined; + this.#chainInExecution = undefined; } flushAll(err: Error): void { this.decoder.reset(); - this._pubSub.reset(); - this._flushWaitingForReply(err); - for (const node of this._toWrite) { - RedisCommandsQueue._flushToWrite(node, err); + this.#pubSub.reset(); + this.#flushWaitingForReply(err); + for (const node of this.#toWrite) { + RedisCommandsQueue.#flushToWrite(node, err); } - this._toWrite.reset(); + this.#toWrite.reset(); } isEmpty() { return ( - this._toWrite.length === 0 && - this._waitingForReply.length === 0 + this.#toWrite.length === 0 && + this.#waitingForReply.length === 0 ); } } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 1f33ca4483..51a4a478d5 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -132,7 +132,7 @@ export type RedisClientType< type ProxyClient = RedisClient; -type NamespaceProxyClient = { self: ProxyClient }; +type NamespaceProxyClient = { _self: ProxyClient }; interface ScanIteratorOptions { cursor?: RedisArgument; @@ -147,7 +147,7 @@ export default class RedisClient< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > extends EventEmitter { - private static _createCommand(command: Command, resp: RespVersions) { + static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: ProxyClient, ...args: Array) { const redisArgs = command.transformArguments(...args), @@ -158,25 +158,25 @@ export default class RedisClient< }; } - private static _createModuleCommand(command: Command, resp: RespVersions) { + static #createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: NamespaceProxyClient, ...args: Array) { const redisArgs = command.transformArguments(...args), - reply = await this.self.sendCommand(redisArgs, this.self._commandOptions); + reply = await this._self.sendCommand(redisArgs, this._self._commandOptions); return transformReply ? transformReply(reply, redisArgs.preserve) : reply; }; } - private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { + static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); return async function (this: NamespaceProxyClient, ...args: Array) { const fnArgs = fn.transformArguments(...args), - reply = await this.self.sendCommand( + reply = await this._self.sendCommand( prefix.concat(fnArgs), - this.self._commandOptions + this._self._commandOptions ); return transformReply ? transformReply(reply, fnArgs.preserve) : @@ -184,7 +184,7 @@ export default class RedisClient< }; } - private static _createScriptCommand(script: RedisScript, resp: RespVersions) { + static #createScriptCommand(script: RedisScript, resp: RespVersions) { const prefix = scriptArgumentsPrefix(script), transformReply = getTransformReply(script, resp); return async function (this: ProxyClient, ...args: Array) { @@ -206,10 +206,10 @@ export default class RedisClient< const Client = attachConfig({ BaseClass: RedisClient, commands: COMMANDS, - createCommand: RedisClient._createCommand, - createModuleCommand: RedisClient._createModuleCommand, - createFunctionCommand: RedisClient._createFunctionCommand, - createScriptCommand: RedisClient._createScriptCommand, + createCommand: RedisClient.#createCommand, + createModuleCommand: RedisClient.#createModuleCommand, + createFunctionCommand: RedisClient.#createFunctionCommand, + createScriptCommand: RedisClient.#createScriptCommand, config }); @@ -218,7 +218,7 @@ export default class RedisClient< return ( options?: Omit, keyof Exclude> ) => { - // returning a "proxy" to prevent the namespaces.self to leak between "proxies" + // returning a "proxy" to prevent the namespaces._self to leak between "proxies" return Object.create(new Client(options)) as RedisClientType; }; } @@ -272,39 +272,38 @@ export default class RedisClient< return parsed; } - self = this; - - private readonly _options?: RedisClientOptions; - private readonly _socket: RedisSocket; - private readonly _queue: RedisCommandsQueue; - private _selectedDB = 0; - private _monitorCallback?: MonitorCallback; + readonly #options?: RedisClientOptions; + readonly #socket: RedisSocket; + readonly #queue: RedisCommandsQueue; + #selectedDB = 0; + #monitorCallback?: MonitorCallback; + private _self = this; private _commandOptions?: CommandOptions; get options(): RedisClientOptions | undefined { - return this._options; + return this._self.#options; } get isOpen(): boolean { - return this._socket.isOpen; + return this._self.#socket.isOpen; } get isReady(): boolean { - return this._socket.isReady; + return this._self.#socket.isReady; } get isPubSubActive() { - return this._queue.isPubSubActive; + return this._self.#queue.isPubSubActive; } constructor(options?: RedisClientOptions) { super(); - this._options = this._initiateOptions(options); - this._queue = this._initiateQueue(); - this._socket = this._initiateSocket(); + this.#options = this.#initiateOptions(options); + this.#queue = this.#initiateQueue(); + this.#socket = this.#initiateSocket(); } - private _initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { + #initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { if (options?.url) { const parsed = RedisClient.parseURL(options.url); if (options.socket) { @@ -315,7 +314,7 @@ export default class RedisClient< } if (options?.database) { - this.self._selectedDB = options.database; + this._self.#selectedDB = options.database; } if (options?.commandOptions) { @@ -325,82 +324,82 @@ export default class RedisClient< return options; } - private _initiateQueue(): RedisCommandsQueue { + #initiateQueue(): RedisCommandsQueue { return new RedisCommandsQueue( - this._options?.RESP, - this._options?.commandsQueueMaxLength, + this.#options?.RESP, + this.#options?.commandsQueueMaxLength, (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners) ); } - private _initiateSocket(): RedisSocket { + #initiateSocket(): RedisSocket { const socketInitiator = async (): Promise => { - const promises = [this._queue.resubscribe()]; + const promises = [this.#queue.resubscribe()]; - if (this._monitorCallback) { + if (this.#monitorCallback) { promises.push( - this._queue.monitor( - this._monitorCallback, + this.#queue.monitor( + this.#monitorCallback, this._commandOptions?.typeMapping, true ) ); } - if (this._selectedDB !== 0) { + if (this.#selectedDB !== 0) { promises.push( - this._queue.addCommand( - ['SELECT', this._selectedDB.toString()], + this.#queue.addCommand( + ['SELECT', this.#selectedDB.toString()], { asap: true } ) ); } - if (this._options?.readonly) { + if (this.#options?.readonly) { promises.push( - this._queue.addCommand( + this.#queue.addCommand( COMMANDS.READONLY.transformArguments(), { asap: true } ) ); } - if (this._options?.RESP) { + if (this.#options?.RESP) { const hello: HelloOptions = {}; - if (this._options.password) { + if (this.#options.password) { hello.AUTH = { - username: this._options.username ?? 'default', - password: this._options.password + username: this.#options.username ?? 'default', + password: this.#options.password }; } - if (this._options.name) { - hello.SETNAME = this._options.name; + if (this.#options.name) { + hello.SETNAME = this.#options.name; } promises.push( - this._queue.addCommand( - HELLO.transformArguments(this._options.RESP, hello), + this.#queue.addCommand( + HELLO.transformArguments(this.#options.RESP, hello), { asap: true } ) ); } else { - if (this._options?.name) { + if (this.#options?.name) { promises.push( - this._queue.addCommand( - COMMANDS.CLIENT_SETNAME.transformArguments(this._options.name), + this.#queue.addCommand( + COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name), { asap: true } ) ); } - if (this._options?.username || this._options?.password) { + if (this.#options?.username || this.#options?.password) { promises.push( - this._queue.addCommand( + this.#queue.addCommand( COMMANDS.AUTH.transformArguments({ - username: this._options.username, - password: this._options.password ?? '' + username: this.#options.username, + password: this.#options.password ?? '' }), { asap: true } ) @@ -409,60 +408,60 @@ export default class RedisClient< } if (promises.length) { - this._write(); + this.#write(); await Promise.all(promises); } }; - return new RedisSocket(socketInitiator, this._options?.socket) + return new RedisSocket(socketInitiator, this.#options?.socket) .on('data', chunk => { try { - this._queue.decoder.write(chunk); + this.#queue.decoder.write(chunk); } catch (err) { - this._queue.decoder.reset(); + this.#queue.decoder.reset(); this.emit('error', err); } }) .on('error', err => { this.emit('error', err); - if (this._socket.isOpen && !this._options?.disableOfflineQueue) { - this._queue.flushWaitingForReply(err); + if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { + this.#queue.flushWaitingForReply(err); } else { - this._queue.flushAll(err); + this.#queue.flushAll(err); } }) .on('connect', () => this.emit('connect')) .on('ready', () => { this.emit('ready'); - this._setPingTimer(); - this._maybeScheduleWrite(); + this.#setPingTimer(); + this.#maybeScheduleWrite(); }) .on('reconnecting', () => this.emit('reconnecting')) - .on('drain', () => this._maybeScheduleWrite()) + .on('drain', () => this.#maybeScheduleWrite()) .on('end', () => this.emit('end')); } - private _pingTimer?: NodeJS.Timeout; + #pingTimer?: NodeJS.Timeout; - private _setPingTimer(): void { - if (!this._options?.pingInterval || !this._socket.isReady) return; - clearTimeout(this._pingTimer); + #setPingTimer(): void { + if (!this.#options?.pingInterval || !this.#socket.isReady) return; + clearTimeout(this.#pingTimer); - this._pingTimer = setTimeout(() => { - if (!this._socket.isReady) return; + this.#pingTimer = setTimeout(() => { + if (!this.#socket.isReady) return; this.sendCommand(['PING']) .then(reply => this.emit('ping-interval', reply)) .catch(err => this.emit('error', err)) - .finally(() => this._setPingTimer()); - }, this._options.pingInterval); + .finally(() => this.#setPingTimer()); + }, this.#options.pingInterval); } withCommandOptions< OPTIONS extends CommandOptions, TYPE_MAPPING extends TypeMapping >(options: OPTIONS) { - const proxy = Object.create(this.self); + const proxy = Object.create(this._self); proxy._commandOptions = options; return proxy as RedisClientType< M, @@ -480,7 +479,7 @@ export default class RedisClient< key: K, value: V ) { - const proxy = Object.create(this.self); + const proxy = Object.create(this._self); proxy._commandOptions = Object.create(this._commandOptions ?? null); proxy._commandOptions[key] = value; return proxy as RedisClientType< @@ -527,7 +526,7 @@ export default class RedisClient< */ createPool(options?: Partial) { return RedisClientPool.create( - this._options, + this._self.#options, options ); } @@ -540,14 +539,14 @@ export default class RedisClient< _TYPE_MAPPING extends TypeMapping = TYPE_MAPPING >(overrides?: Partial>) { return new (Object.getPrototypeOf(this).constructor)({ - ...this._options, + ...this._self.#options, commandOptions: this._commandOptions, ...overrides }) as RedisClientType<_M, _F, _S, _RESP, _TYPE_MAPPING>; } async connect() { - await this._socket.connect(); + await this._self.#socket.connect(); return this as unknown as RedisClientType; } @@ -555,14 +554,14 @@ export default class RedisClient< args: Array, options?: CommandOptions ): Promise { - if (!this._socket.isOpen) { + if (!this._self.#socket.isOpen) { return Promise.reject(new ClientClosedError()); - } else if (!this._socket.isReady && this._options?.disableOfflineQueue) { + } else if (!this._self.#socket.isReady && this._self.#options?.disableOfflineQueue) { return Promise.reject(new ClientOfflineError()); } - const promise = this._queue.addCommand(args, options); - this._scheduleWrite(); + const promise = this._self.#queue.addCommand(args, options); + this._self.#scheduleWrite(); return promise; } @@ -584,15 +583,15 @@ export default class RedisClient< async SELECT(db: number): Promise { await this.sendCommand(['SELECT', db.toString()]); - this.self._selectedDB = db; + this._self.#selectedDB = db; } select = this.SELECT; - private _pubSubCommand(promise: Promise | undefined) { + #pubSubCommand(promise: Promise | undefined) { if (promise === undefined) return Promise.resolve(); - this._scheduleWrite(); + this.#scheduleWrite(); return promise; } @@ -601,8 +600,8 @@ export default class RedisClient< listener: PubSubListener, bufferMode?: T ): Promise { - return this._pubSubCommand( - this._queue.subscribe( + return this._self.#pubSubCommand( + this._self.#queue.subscribe( PubSubType.CHANNELS, channels, listener, @@ -618,8 +617,8 @@ export default class RedisClient< listener?: PubSubListener, bufferMode?: T ): Promise { - return this._pubSubCommand( - this._queue.unsubscribe( + return this._self.#pubSubCommand( + this._self.#queue.unsubscribe( PubSubType.CHANNELS, channels, listener, @@ -635,8 +634,8 @@ export default class RedisClient< listener: PubSubListener, bufferMode?: T ): Promise { - return this._pubSubCommand( - this._queue.subscribe( + return this._self.#pubSubCommand( + this._self.#queue.subscribe( PubSubType.PATTERNS, patterns, listener, @@ -652,8 +651,8 @@ export default class RedisClient< listener?: PubSubListener, bufferMode?: T ): Promise { - return this._pubSubCommand( - this._queue.unsubscribe( + return this._self.#pubSubCommand( + this._self.#queue.unsubscribe( PubSubType.PATTERNS, patterns, listener, @@ -669,8 +668,8 @@ export default class RedisClient< listener: PubSubListener, bufferMode?: T ): Promise { - return this._pubSubCommand( - this._queue.subscribe( + return this._self.#pubSubCommand( + this._self.#queue.subscribe( PubSubType.SHARDED, channels, listener, @@ -686,8 +685,8 @@ export default class RedisClient< listener?: PubSubListener, bufferMode?: T ): Promise { - return this._pubSubCommand( - this._queue.unsubscribe( + return this._self.#pubSubCommand( + this._self.#queue.unsubscribe( PubSubType.SHARDED, channels, listener, @@ -699,7 +698,7 @@ export default class RedisClient< sUnsubscribe = this.SUNSUBSCRIBE; getPubSubListeners(type: PubSubType) { - return this._queue.getPubSubListeners(type); + return this._self.#queue.getPubSubListeners(type); } extendPubSubChannelListeners( @@ -707,36 +706,36 @@ export default class RedisClient< channel: string, listeners: ChannelListeners ) { - return this._pubSubCommand( - this._queue.extendPubSubChannelListeners(type, channel, listeners) + return this._self.#pubSubCommand( + this._self.#queue.extendPubSubChannelListeners(type, channel, listeners) ); } extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) { - return this._pubSubCommand( - this._queue.extendPubSubListeners(type, listeners) + return this._self.#pubSubCommand( + this._self.#queue.extendPubSubListeners(type, listeners) ); } - private _write() { - this._socket.write(this._queue.commandsToWrite()); + #write() { + this.#socket.write(this.#queue.commandsToWrite()); } - private _scheduledWrite?: NodeJS.Immediate; + #scheduledWrite?: NodeJS.Immediate; - private _scheduleWrite() { - if (!this.isReady || this._scheduledWrite) return; + #scheduleWrite() { + if (!this.#socket.isReady || this.#scheduledWrite) return; - this._scheduledWrite = setImmediate(() => { - this._write(); - this._scheduledWrite = undefined; + this.#scheduledWrite = setImmediate(() => { + this.#write(); + this.#scheduledWrite = undefined; }); } - private _maybeScheduleWrite() { - if (!this._queue.isWaitingToWrite()) return; + #maybeScheduleWrite() { + if (!this.#queue.isWaitingToWrite()) return; - this._scheduleWrite(); + this.#scheduleWrite(); } /** @@ -746,20 +745,20 @@ export default class RedisClient< commands: Array, selectedDB?: number ) { - if (!this._socket.isOpen) { + if (!this._self.#socket.isOpen) { return Promise.reject(new ClientClosedError()); } const promise = Promise.all( - commands.map(({ args }) => this._queue.addCommand(args, { + commands.map(({ args }) => this._self.#queue.addCommand(args, { typeMapping: this._commandOptions?.typeMapping })) ); - this._scheduleWrite(); + this._self.#scheduleWrite(); const result = await promise; if (selectedDB !== undefined) { - this.self._selectedDB = selectedDB; + this._self.#selectedDB = selectedDB; } return result; @@ -772,19 +771,19 @@ export default class RedisClient< commands: Array, selectedDB?: number ) { - if (!this._socket.isOpen) { + if (!this._self.#socket.isOpen) { throw new ClientClosedError(); } const typeMapping = this._commandOptions?.typeMapping, chainId = Symbol('MULTI Chain'), promises = [ - this._queue.addCommand(['MULTI'], { chainId }), + this._self.#queue.addCommand(['MULTI'], { chainId }), ]; for (const { args } of commands) { promises.push( - this._queue.addCommand(args, { + this._self.#queue.addCommand(args, { chainId, typeMapping }) @@ -792,10 +791,10 @@ export default class RedisClient< } promises.push( - this._queue.addCommand(['EXEC'], { chainId }) + this._self.#queue.addCommand(['EXEC'], { chainId }) ); - this._scheduleWrite(); + this._self.#scheduleWrite(); const results = await Promise.all(promises), execResult = results[results.length - 1]; @@ -805,7 +804,7 @@ export default class RedisClient< } if (selectedDB !== undefined) { - this.self._selectedDB = selectedDB; + this._self.#selectedDB = selectedDB; } return execResult as Array; @@ -873,16 +872,16 @@ export default class RedisClient< } async MONITOR(callback: MonitorCallback) { - const promise = this._queue.monitor(callback, this._commandOptions?.typeMapping); - this._scheduleWrite(); + const promise = this._self.#queue.monitor(callback, this._commandOptions?.typeMapping); + this._self.#scheduleWrite(); const off = await promise; - this._monitorCallback = callback; + this._self.#monitorCallback = callback; return async () => { const promise = off(); - this._scheduleWrite(); + this._self.#scheduleWrite(); await promise; - this._monitorCallback = undefined; + this._self.#monitorCallback = undefined; }; } @@ -892,10 +891,10 @@ export default class RedisClient< * @deprecated use .close instead */ QUIT(): Promise { - return this._socket.quit(async () => { - clearTimeout(this._pingTimer); - const quitPromise = this._queue.addCommand(['QUIT']); - this._scheduleWrite(); + return this._self.#socket.quit(async () => { + clearTimeout(this._self.#pingTimer); + const quitPromise = this._self.#queue.addCommand(['QUIT']); + this._self.#scheduleWrite(); return quitPromise; }); } @@ -914,22 +913,22 @@ export default class RedisClient< */ close() { return new Promise(resolve => { - clearTimeout(this._pingTimer); - this._socket.close(); + clearTimeout(this._self.#pingTimer); + this._self.#socket.close(); - if (this._queue.isEmpty()) { - this._socket.destroySocket(); + if (this._self.#queue.isEmpty()) { + this._self.#socket.destroySocket(); return resolve(); } const maybeClose = () => { - if (!this._queue.isEmpty()) return; + if (!this._self.#queue.isEmpty()) return; - this._socket.off('data', maybeClose); - this._socket.destroySocket(); + this._self.#socket.off('data', maybeClose); + this._self.#socket.destroySocket(); resolve(); }; - this._socket.on('data', maybeClose); + this._self.#socket.on('data', maybeClose); }); } @@ -937,16 +936,16 @@ export default class RedisClient< * Destroy the client. Rejects all commands immediately. */ destroy() { - clearTimeout(this._pingTimer); - this._queue.flushAll(new DisconnectsClientError()); - this._socket.destroy(); + clearTimeout(this._self.#pingTimer); + this._self.#queue.flushAll(new DisconnectsClientError()); + this._self.#socket.destroy(); } ref() { - this._socket.ref(); + this._self.#socket.ref(); } unref() { - this._socket.unref(); + this._self.#socket.unref(); } } diff --git a/packages/client/lib/client/legacy-mode.ts b/packages/client/lib/client/legacy-mode.ts index 555c4eaae5..b30ebc68bc 100644 --- a/packages/client/lib/client/legacy-mode.ts +++ b/packages/client/lib/client/legacy-mode.ts @@ -23,7 +23,7 @@ type WithCommands = { export type RedisLegacyClientType = RedisLegacyClient & WithCommands; export class RedisLegacyClient { - private static _transformArguments(redisArgs: CommandArguments, args: LegacyCommandArguments) { + static #transformArguments(redisArgs: CommandArguments, args: LegacyCommandArguments) { let callback: LegacyCallback | undefined; if (typeof args[args.length - 1] === 'function') { callback = args.pop() as LegacyCallback; @@ -55,15 +55,15 @@ export class RedisLegacyClient { undefined; } - private static _createCommand(name: string, command: Command, resp: RespVersions) { + static #createCommand(name: string, command: Command, resp: RespVersions) { const transformReply = RedisLegacyClient.getTransformReply(command, resp); return async function (this: RedisLegacyClient, ...args: LegacyCommandArguments) { const redisArgs = [name], - callback = RedisLegacyClient._transformArguments(redisArgs, args), - promise = this._client.sendCommand(redisArgs); + callback = RedisLegacyClient.#transformArguments(redisArgs, args), + promise = this.#client.sendCommand(redisArgs); if (!callback) { - promise.catch(err => this._client.emit('error', err)); + promise.catch(err => this.#client.emit('error', err)); return; } @@ -73,31 +73,34 @@ export class RedisLegacyClient { }; } - private _Multi: ReturnType; + #client: RedisClientType; + #Multi: ReturnType; constructor( - private _client: RedisClientType + client: RedisClientType ) { - const RESP = _client.options?.RESP ?? 2; + this.#client = client; + + const RESP = client.options?.RESP ?? 2; for (const [name, command] of Object.entries(COMMANDS)) { // TODO: as any? - (this as any)[name] = RedisLegacyClient._createCommand( + (this as any)[name] = RedisLegacyClient.#createCommand( name, command, RESP ); } - this._Multi = LegacyMultiCommand.factory(RESP); + this.#Multi = LegacyMultiCommand.factory(RESP); } sendCommand(...args: LegacyCommandArguments) { const redisArgs: CommandArguments = [], - callback = RedisLegacyClient._transformArguments(redisArgs, args), - promise = this._client.sendCommand(redisArgs); + callback = RedisLegacyClient.#transformArguments(redisArgs, args), + promise = this.#client.sendCommand(redisArgs); if (!callback) { - promise.catch(err => this._client.emit('error', err)); + promise.catch(err => this.#client.emit('error', err)); return; } @@ -107,7 +110,7 @@ export class RedisLegacyClient { } multi() { - return this._Multi(this._client); + return this.#Multi(this.#client); } } @@ -118,12 +121,12 @@ type MultiWithCommands = { export type RedisLegacyMultiType = LegacyMultiCommand & MultiWithCommands; class LegacyMultiCommand { - private static _createCommand(name: string, command: Command, resp: RespVersions) { + static #createCommand(name: string, command: Command, resp: RespVersions) { const transformReply = RedisLegacyClient.getTransformReply(command, resp); return function (this: LegacyMultiCommand, ...args: LegacyArguments) { const redisArgs = [name]; RedisLegacyClient.pushArguments(redisArgs, args); - this._multi.addCommand(redisArgs, transformReply); + this.#multi.addCommand(redisArgs, transformReply); return this; }; } @@ -133,7 +136,7 @@ class LegacyMultiCommand { for (const [name, command] of Object.entries(COMMANDS)) { // TODO: as any? - (Multi as any).prototype[name] = LegacyMultiCommand._createCommand( + (Multi as any).prototype[name] = LegacyMultiCommand.#createCommand( name, command, resp @@ -145,30 +148,30 @@ class LegacyMultiCommand { }; } - private readonly _multi = new RedisMultiCommand(); - private readonly _client: RedisClientType; + readonly #multi = new RedisMultiCommand(); + readonly #client: RedisClientType; constructor(client: RedisClientType) { - this._client = client; + this.#client = client; } sendCommand(...args: LegacyArguments) { const redisArgs: CommandArguments = []; RedisLegacyClient.pushArguments(redisArgs, args); - this._multi.addCommand(redisArgs); + this.#multi.addCommand(redisArgs); return this; } exec(cb?: (err: ErrorReply | null, replies?: Array) => unknown) { - const promise = this._client._executeMulti(this._multi.queue); + const promise = this.#client._executeMulti(this.#multi.queue); if (!cb) { - promise.catch(err => this._client.emit('error', err)); + promise.catch(err => this.#client.emit('error', err)); return; } promise - .then(results => cb(null, this._multi.transformReplies(results))) + .then(results => cb(null, this.#multi.transformReplies(results))) .catch(err => cb?.(err)); } } diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index 7aed43cfd1..ac1d021be9 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -5,56 +5,56 @@ export interface DoublyLinkedNode { } export class DoublyLinkedList { - private _length = 0; + #length = 0; get length() { - return this._length; + return this.#length; } - private _head?: DoublyLinkedNode; + #head?: DoublyLinkedNode; get head() { - return this._head; + return this.#head; } - private _tail?: DoublyLinkedNode; + #tail?: DoublyLinkedNode; get tail() { - return this._tail; + return this.#tail; } push(value: T) { - ++this._length; + ++this.#length; - if (this._tail === undefined) { - return this._tail = this._head = { - previous: this._head, + if (this.#tail === undefined) { + return this.#tail = this.#head = { + previous: this.#head, next: undefined, value }; } - return this._tail = this._tail.next = { - previous: this._tail, + return this.#tail = this.#tail.next = { + previous: this.#tail, next: undefined, value }; } unshift(value: T) { - ++this._length; + ++this.#length; - if (this._head === undefined) { - return this._head = this._tail = { + if (this.#head === undefined) { + return this.#head = this.#tail = { previous: undefined, next: undefined, value }; } - return this._head = this._head.previous = { + return this.#head = this.#head.previous = { previous: undefined, - next: this._head, + next: this.#head, value }; } @@ -66,29 +66,29 @@ export class DoublyLinkedList { } shift() { - if (this._head === undefined) return undefined; + if (this.#head === undefined) return undefined; - --this._length; - const node = this._head; + --this.#length; + const node = this.#head; if (node.next) { node.next.previous = node.previous; - this._head = node.next; + this.#head = node.next; node.next = undefined; } else { - this._head = this._tail = undefined; + this.#head = this.#tail = undefined; } return node.value; } remove(node: DoublyLinkedNode) { - --this._length; + --this.#length; - if (this._tail === node) { - this._tail = node.previous; + if (this.#tail === node) { + this.#tail = node.previous; } - if (this._head === node) { - this._head = node.next; + if (this.#head === node) { + this.#head = node.next; } else { node.previous!.next = node.next; node.previous = undefined; @@ -98,12 +98,12 @@ export class DoublyLinkedList { } reset() { - this._length = 0; - this._head = this._tail = undefined; + this.#length = 0; + this.#head = this.#tail = undefined; } *[Symbol.iterator]() { - let node = this._head; + let node = this.#head; while (node !== undefined) { yield node.value; node = node.next; @@ -117,50 +117,50 @@ export interface SinglyLinkedNode { } export class SinglyLinkedList { - private _length = 0; + #length = 0; get length() { - return this._length; + return this.#length; } - private _head?: SinglyLinkedNode; + #head?: SinglyLinkedNode; get head() { - return this._head; + return this.#head; } - private _tail?: SinglyLinkedNode; + #tail?: SinglyLinkedNode; get tail() { - return this._tail; + return this.#tail; } push(value: T) { - ++this._length; + ++this.#length; const node = { value, next: undefined }; - if (this._head === undefined) { - return this._head = this._tail = node; + if (this.#head === undefined) { + return this.#head = this.#tail = node; } - return this._tail!.next = this._tail = node; + return this.#tail!.next = this.#tail = node; } remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined) { - --this._length; + --this.#length; - if (this._head === node) { - if (this._tail === node) { - this._head = this._tail = undefined; + if (this.#head === node) { + if (this.#tail === node) { + this.#head = this.#tail = undefined; } else { - this._head = node.next; + this.#head = node.next; } - } else if (this._tail === node) { - this._tail = parent; + } else if (this.#tail === node) { + this.#tail = parent; parent!.next = undefined; } else { parent!.next = node.next; @@ -168,25 +168,25 @@ export class SinglyLinkedList { } shift() { - if (this._head === undefined) return undefined; + if (this.#head === undefined) return undefined; - const node = this._head; - if (--this._length === 0) { - this._head = this._tail = undefined; + const node = this.#head; + if (--this.#length === 0) { + this.#head = this.#tail = undefined; } else { - this._head = node.next; + this.#head = node.next; } return node.value; } reset() { - this._length = 0; - this._head = this._tail = undefined; + this.#length = 0; + this.#head = this.#tail = undefined; } *[Symbol.iterator]() { - let node = this._head; + let node = this.#head; while (node !== undefined) { yield node.value; node = node.next; diff --git a/packages/client/lib/client/multi-command.ts b/packages/client/lib/client/multi-command.ts index 7daf4b5d80..ef65144d56 100644 --- a/packages/client/lib/client/multi-command.ts +++ b/packages/client/lib/client/multi-command.ts @@ -86,7 +86,7 @@ export type RedisClientMultiCommandType< type ExecuteMulti = (commands: Array, selectedDB?: number) => Promise>; export default class RedisClientMultiCommand { - private static _createCommand(command: Command, resp: RespVersions) { + static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return function (this: RedisClientMultiCommand, ...args: Array) { return this.addCommand( @@ -96,34 +96,34 @@ export default class RedisClientMultiCommand { }; } - private static _createModuleCommand(command: Command, resp: RespVersions) { + static #createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); - return function (this: { self: RedisClientMultiCommand }, ...args: Array) { - return this.self.addCommand( + return function (this: { _self: RedisClientMultiCommand }, ...args: Array) { + return this._self.addCommand( command.transformArguments(...args), transformReply ); }; } - private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { + static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); - return function (this: { self: RedisClientMultiCommand }, ...args: Array) { + return function (this: { _self: RedisClientMultiCommand }, ...args: Array) { const fnArgs = fn.transformArguments(...args), redisArgs: CommandArguments = prefix.concat(fnArgs); redisArgs.preserve = fnArgs.preserve; - return this.self.addCommand( + return this._self.addCommand( redisArgs, transformReply ); }; } - private static _createScriptCommand(script: RedisScript, resp: RespVersions) { + static #createScriptCommand(script: RedisScript, resp: RespVersions) { const transformReply = getTransformReply(script, resp); return function (this: RedisClientMultiCommand, ...args: Array) { - this._multi.addScript( + this.#multi.addScript( script, script.transformArguments(...args), transformReply @@ -141,42 +141,42 @@ export default class RedisClientMultiCommand { return attachConfig({ BaseClass: RedisClientMultiCommand, commands: COMMANDS, - createCommand: RedisClientMultiCommand._createCommand, - createModuleCommand: RedisClientMultiCommand._createModuleCommand, - createFunctionCommand: RedisClientMultiCommand._createFunctionCommand, - createScriptCommand: RedisClientMultiCommand._createScriptCommand, + createCommand: RedisClientMultiCommand.#createCommand, + createModuleCommand: RedisClientMultiCommand.#createModuleCommand, + createFunctionCommand: RedisClientMultiCommand.#createFunctionCommand, + createScriptCommand: RedisClientMultiCommand.#createScriptCommand, config }); } - private readonly _multi = new RedisMultiCommand(); - private readonly _executeMulti: ExecuteMulti; - private readonly _executePipeline: ExecuteMulti; - private _selectedDB?: number; + readonly #multi = new RedisMultiCommand(); + readonly #executeMulti: ExecuteMulti; + readonly #executePipeline: ExecuteMulti; + #selectedDB?: number; constructor(executeMulti: ExecuteMulti, executePipeline: ExecuteMulti) { - this._executeMulti = executeMulti; - this._executePipeline = executePipeline; + this.#executeMulti = executeMulti; + this.#executePipeline = executePipeline; } SELECT(db: number, transformReply?: TransformReply): this { - this._selectedDB = db; - this._multi.addCommand(['SELECT', db.toString()], transformReply); + this.#selectedDB = db; + this.#multi.addCommand(['SELECT', db.toString()], transformReply); return this; } select = this.SELECT; addCommand(args: CommandArguments, transformReply?: TransformReply) { - this._multi.addCommand(args, transformReply); + this.#multi.addCommand(args, transformReply); return this; } async exec(execAsPipeline = false): Promise> { if (execAsPipeline) return this.execAsPipeline(); - return this._multi.transformReplies( - await this._executeMulti(this._multi.queue, this._selectedDB) + return this.#multi.transformReplies( + await this.#executeMulti(this.#multi.queue, this.#selectedDB) ) as MultiReplyType; } @@ -187,10 +187,10 @@ export default class RedisClientMultiCommand { } async execAsPipeline(): Promise> { - if (this._multi.queue.length === 0) return [] as MultiReplyType; + if (this.#multi.queue.length === 0) return [] as MultiReplyType; - return this._multi.transformReplies( - await this._executePipeline(this._multi.queue, this._selectedDB) + return this.#multi.transformReplies( + await this.#executePipeline(this.#multi.queue, this.#selectedDB) ) as MultiReplyType; } diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index 5ea76e3e30..12d8f5fee5 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -49,7 +49,7 @@ export type RedisClientPoolType< type ProxyPool = RedisClientPoolType; -type NamespaceProxyPool = { self: ProxyPool }; +type NamespaceProxyPool = { _self: ProxyPool }; export class RedisClientPool< M extends RedisModules = {}, @@ -58,7 +58,7 @@ export class RedisClientPool< RESP extends RespVersions = 2, TYPE_MAPPING extends TypeMapping = {} > extends EventEmitter { - private static _createCommand(command: Command, resp: RespVersions) { + static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: ProxyPool, ...args: Array) { const redisArgs = command.transformArguments(...args), @@ -69,25 +69,25 @@ export class RedisClientPool< }; } - private static _createModuleCommand(command: Command, resp: RespVersions) { + static #createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: NamespaceProxyPool, ...args: Array) { const redisArgs = command.transformArguments(...args), - reply = await this.self.sendCommand(redisArgs, this.self._commandOptions); + reply = await this._self.sendCommand(redisArgs, this._self._commandOptions); return transformReply ? transformReply(reply, redisArgs.preserve) : reply; }; } - private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { + static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); return async function (this: NamespaceProxyPool, ...args: Array) { const fnArgs = fn.transformArguments(...args), - reply = await this.self.sendCommand( + reply = await this._self.sendCommand( prefix.concat(fnArgs), - this.self._commandOptions + this._self._commandOptions ); return transformReply ? transformReply(reply, fnArgs.preserve) : @@ -95,7 +95,7 @@ export class RedisClientPool< }; } - private static _createScriptCommand(script: RedisScript, resp: RespVersions) { + static #createScriptCommand(script: RedisScript, resp: RespVersions) { const prefix = scriptArgumentsPrefix(script), transformReply = getTransformReply(script, resp); return async function (this: ProxyPool, ...args: Array) { @@ -115,23 +115,22 @@ export class RedisClientPool< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping = {} >( - // clientFactory: () => RedisClientType, clientOptions?: RedisClientOptions, options?: Partial ) { const Pool = attachConfig({ BaseClass: RedisClientPool, commands: COMMANDS, - createCommand: RedisClientPool._createCommand, - createModuleCommand: RedisClientPool._createModuleCommand, - createFunctionCommand: RedisClientPool._createFunctionCommand, - createScriptCommand: RedisClientPool._createScriptCommand, + createCommand: RedisClientPool.#createCommand, + createModuleCommand: RedisClientPool.#createModuleCommand, + createFunctionCommand: RedisClientPool.#createFunctionCommand, + createScriptCommand: RedisClientPool.#createScriptCommand, config: clientOptions }); Pool.prototype.Multi = RedisClientMultiCommand.extend(clientOptions); - // returning a "proxy" to prevent the namespaces.self to leak between "proxies" + // returning a "proxy" to prevent the namespaces._self to leak between "proxies" return Object.create( new Pool( RedisClient.factory(clientOptions).bind(undefined, clientOptions), @@ -141,51 +140,42 @@ export class RedisClientPool< } // TODO: defaults - private static _DEFAULTS = { + static #DEFAULTS = { minimum: 1, maximum: 100, acquireTimeout: 3000, cleanupDelay: 3000 } satisfies RedisPoolOptions; - private readonly _clientFactory: () => RedisClientType; - private readonly _options: RedisPoolOptions; + readonly #clientFactory: () => RedisClientType; + readonly #options: RedisPoolOptions; - private readonly _idleClients = new SinglyLinkedList>(); + readonly #idleClients = new SinglyLinkedList>(); /** * The number of idle clients. */ get idleClients() { - return this._idleClients.length; + return this._self.#idleClients.length; } - private readonly _clientsInUse = new DoublyLinkedList>(); + readonly #clientsInUse = new DoublyLinkedList>(); /** * The number of clients in use. */ get clientsInUse() { - return this._clientsInUse.length; - } - - private readonly _connectingClients = 0; - - /** - * The number of clients that are currently connecting. - */ - get connectingClients() { - return this._connectingClients; + return this._self.#clientsInUse.length; } /** * The total number of clients in the pool (including connecting, idle, and in use). */ get totalClients() { - return this._idleClients.length + this._clientsInUse.length; + return this._self.#idleClients.length + this._self.#clientsInUse.length; } - private readonly _tasksQueue = new SinglyLinkedList<{ + readonly #tasksQueue = new SinglyLinkedList<{ timeout: NodeJS.Timeout | undefined; resolve: (value: unknown) => unknown; reject: (reason?: unknown) => unknown; @@ -196,25 +186,25 @@ export class RedisClientPool< * The number of tasks waiting for a client to become available. */ get tasksQueueLength() { - return this._tasksQueue.length; + return this._self.#tasksQueue.length; } - private _isOpen = false; + #isOpen = false; /** * Whether the pool is open (either connecting or connected). */ get isOpen() { - return this._isOpen; + return this._self.#isOpen; } - private _isClosing = false; + #isClosing = false; /** * Whether the pool is closing (*not* closed). */ get isClosing() { - return this._isClosing; + return this._self.#isClosing; } /** @@ -228,9 +218,9 @@ export class RedisClientPool< ) { super(); - this._clientFactory = clientFactory; - this._options = { - ...RedisClientPool._DEFAULTS, + this.#clientFactory = clientFactory; + this.#options = { + ...RedisClientPool.#DEFAULTS, ...options }; } @@ -253,7 +243,7 @@ export class RedisClientPool< >; } - private _commandOptionsProxy< + #commandOptionsProxy< K extends keyof CommandOptions, V extends CommandOptions[K] >( @@ -276,14 +266,14 @@ export class RedisClientPool< * Override the `typeMapping` command option */ withTypeMapping(typeMapping: TYPE_MAPPING) { - return this._commandOptionsProxy('typeMapping', typeMapping); + return this._self.#commandOptionsProxy('typeMapping', typeMapping); } /** * Override the `abortSignal` command option */ withAbortSignal(abortSignal: AbortSignal) { - return this._commandOptionsProxy('abortSignal', abortSignal); + return this._self.#commandOptionsProxy('abortSignal', abortSignal); } /** @@ -291,17 +281,17 @@ export class RedisClientPool< * TODO: remove? */ asap() { - return this._commandOptionsProxy('asap', true); + return this._self.#commandOptionsProxy('asap', true); } async connect() { - if (this._isOpen) return; // TODO: throw error? + if (this._self.#isOpen) return; // TODO: throw error? - this._isOpen = true; + this._self.#isOpen = true; const promises = []; - while (promises.length < this._options.minimum) { - promises.push(this._create()); + while (promises.length < this._self.#options.minimum) { + promises.push(this._self.#create()); } try { @@ -313,39 +303,39 @@ export class RedisClientPool< } } - private async _create() { - const node = this._clientsInUse.push( - this._clientFactory() + async #create() { + const node = this._self.#clientsInUse.push( + this._self.#clientFactory() .on('error', (err: Error) => this.emit('error', err)) ); try { await node.value.connect(); } catch (err) { - this._clientsInUse.remove(node); + this._self.#clientsInUse.remove(node); throw err; } - this._returnClient(node); + this._self.#returnClient(node); } execute(fn: PoolTask) { return new Promise>((resolve, reject) => { - const client = this._idleClients.shift(), - { tail } = this._tasksQueue; + const client = this._self.#idleClients.shift(), + { tail } = this._self.#tasksQueue; if (!client) { let timeout; - if (this._options.acquireTimeout > 0) { + if (this._self.#options.acquireTimeout > 0) { timeout = setTimeout( () => { - this._tasksQueue.remove(task, tail); + this._self.#tasksQueue.remove(task, tail); reject(new TimeoutError('Timeout waiting for a client')); // TODO: message }, - this._options.acquireTimeout + this._self.#options.acquireTimeout ); } - const task = this._tasksQueue.push({ + const task = this._self.#tasksQueue.push({ timeout, // @ts-ignore resolve, @@ -353,20 +343,20 @@ export class RedisClientPool< fn }); - if (this.totalClients < this._options.maximum) { - this._create(); + if (this.totalClients < this._self.#options.maximum) { + this._self.#create(); } return; } - const node = this._clientsInUse.push(client); + const node = this._self.#clientsInUse.push(client); // @ts-ignore - this._executeTask(node, resolve, reject, fn); + this._self.#executeTask(node, resolve, reject, fn); }); } - private _executeTask( + #executeTask( node: DoublyLinkedNode>, resolve: (value: T | PromiseLike) => void, reject: (reason?: unknown) => void, @@ -375,40 +365,40 @@ export class RedisClientPool< const result = fn(node.value); if (result instanceof Promise) { result.then(resolve, reject); - result.finally(() => this._returnClient(node)) + result.finally(() => this.#returnClient(node)) } else { resolve(result); - this._returnClient(node); + this.#returnClient(node); } } - private _returnClient(node: DoublyLinkedNode>) { - const task = this._tasksQueue.shift(); + #returnClient(node: DoublyLinkedNode>) { + const task = this.#tasksQueue.shift(); if (task) { - this._executeTask(node, task.resolve, task.reject, task.fn); + this.#executeTask(node, task.resolve, task.reject, task.fn); return; } - this._clientsInUse.remove(node); - this._idleClients.push(node.value); + this.#clientsInUse.remove(node); + this.#idleClients.push(node.value); - this._scheduleCleanup(); + this.#scheduleCleanup(); } cleanupTimeout?: NodeJS.Timeout; - private _scheduleCleanup() { - if (this.totalClients <= this._options.minimum) return; + #scheduleCleanup() { + if (this.totalClients <= this.#options.minimum) return; clearTimeout(this.cleanupTimeout); - this.cleanupTimeout = setTimeout(() => this._cleanup(), this._options.cleanupDelay); + this.cleanupTimeout = setTimeout(() => this.#cleanup(), this.#options.cleanupDelay); } - private _cleanup() { - const toDestroy = Math.min(this._idleClients.length, this.totalClients - this._options.minimum); + #cleanup() { + const toDestroy = Math.min(this.#idleClients.length, this.totalClients - this.#options.minimum); for (let i = 0; i < toDestroy; i++) { // TODO: shift vs pop - this._idleClients.shift()!.destroy(); + this.#idleClients.shift()!.destroy(); } } @@ -438,44 +428,44 @@ export class RedisClientPool< multi = this.MULTI; async close() { - if (this._isClosing) return; // TODO: throw err? - if (!this._isOpen) return; // TODO: throw err? + if (this._self.#isClosing) return; // TODO: throw err? + if (!this._self.#isOpen) return; // TODO: throw err? - this._isClosing = true; + this._self.#isClosing = true; try { const promises = []; - for (const client of this._idleClients) { + for (const client of this._self.#idleClients) { promises.push(client.close()); } - for (const client of this._clientsInUse) { + for (const client of this._self.#clientsInUse) { promises.push(client.close()); } await Promise.all(promises); - this._idleClients.reset(); - this._clientsInUse.reset(); + this._self.#idleClients.reset(); + this._self.#clientsInUse.reset(); } catch (err) { } finally { - this._isClosing = false; + this._self.#isClosing = false; } } destroy() { - for (const client of this._idleClients) { + for (const client of this._self.#idleClients) { client.destroy(); } - this._idleClients.reset(); + this._self.#idleClients.reset(); - for (const client of this._clientsInUse) { + for (const client of this._self.#clientsInUse) { client.destroy(); } - this._clientsInUse.reset(); + this._self.#clientsInUse.reset(); - this._isOpen = false; + this._self.#isOpen = false; } } diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 01968c1303..129ace9039 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -46,7 +46,7 @@ interface CreateSocketReturn { export type RedisSocketInitiator = () => Promise; export default class RedisSocket extends EventEmitter { - private static _initiateOptions(options?: RedisSocketOptions): RedisSocketOptions { + static #initiateOptions(options?: RedisSocketOptions): RedisSocketOptions { options ??= {}; if (!(options as net.IpcSocketConnectOpts).path) { (options as net.TcpSocketConnectOpts).port ??= 6379; @@ -60,45 +60,45 @@ export default class RedisSocket extends EventEmitter { return options; } - private static _isTlsSocket(options: RedisSocketOptions): options is RedisTlsSocketOptions { + static #isTlsSocket(options: RedisSocketOptions): options is RedisTlsSocketOptions { return (options as RedisTlsSocketOptions).tls === true; } - private readonly _initiator: RedisSocketInitiator; + readonly #initiator: RedisSocketInitiator; - private readonly _options: RedisSocketOptions; + readonly #options: RedisSocketOptions; - private _socket?: net.Socket | tls.TLSSocket; + #socket?: net.Socket | tls.TLSSocket; - private _isOpen = false; + #isOpen = false; get isOpen(): boolean { - return this._isOpen; + return this.#isOpen; } - private _isReady = false; + #isReady = false; get isReady(): boolean { - return this._isReady; + return this.#isReady; } - private _isSocketUnrefed = false; + #isSocketUnrefed = false; constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { super(); - this._initiator = initiator; - this._options = RedisSocket._initiateOptions(options); + this.#initiator = initiator; + this.#options = RedisSocket.#initiateOptions(options); } - private _reconnectStrategy(retries: number, cause: Error) { - if (this._options.reconnectStrategy === false) { + #reconnectStrategy(retries: number, cause: Error) { + if (this.#options.reconnectStrategy === false) { return false; - } else if (typeof this._options.reconnectStrategy === 'number') { - return this._options.reconnectStrategy; - } else if (this._options.reconnectStrategy) { + } else if (typeof this.#options.reconnectStrategy === 'number') { + return this.#options.reconnectStrategy; + } else if (this.#options.reconnectStrategy) { try { - const retryIn = this._options.reconnectStrategy(retries, cause); + const retryIn = this.#options.reconnectStrategy(retries, cause); if (retryIn !== false && !(retryIn instanceof Error) && typeof retryIn !== 'number') { throw new TypeError(`Reconnect strategy should return \`false | Error | number\`, got ${retryIn} instead`); } @@ -112,14 +112,14 @@ export default class RedisSocket extends EventEmitter { return Math.min(retries * 50, 500); } - private _shouldReconnect(retries: number, cause: Error) { - const retryIn = this._reconnectStrategy(retries, cause); + #shouldReconnect(retries: number, cause: Error) { + const retryIn = this.#reconnectStrategy(retries, cause); if (retryIn === false) { - this._isOpen = false; + this.#isOpen = false; this.emit('error', cause); return cause; } else if (retryIn instanceof Error) { - this._isOpen = false; + this.#isOpen = false; this.emit('error', cause); return new ReconnectStrategyError(retryIn, cause); } @@ -128,32 +128,32 @@ export default class RedisSocket extends EventEmitter { } async connect(): Promise { - if (this._isOpen) { + if (this.#isOpen) { throw new Error('Socket already opened'); } - this._isOpen = true; - return this._connect(); + this.#isOpen = true; + return this.#connect(); } - private async _connect(): Promise { + async #connect(): Promise { let retries = 0; do { try { - this._socket = await this._createSocket(); + this.#socket = await this.#createSocket(); this.emit('connect'); try { - await this._initiator(); + await this.#initiator(); } catch (err) { - this._socket.destroy(); - this._socket = undefined; + this.#socket.destroy(); + this.#socket = undefined; throw err; } - this._isReady = true; + this.#isReady = true; this.emit('ready'); } catch (err) { - const retryIn = this._shouldReconnect(retries++, err as Error); + const retryIn = this.#shouldReconnect(retries++, err as Error); if (typeof retryIn !== 'number') { throw retryIn; } @@ -162,36 +162,36 @@ export default class RedisSocket extends EventEmitter { await setTimeout(retryIn); this.emit('reconnecting'); } - } while (this._isOpen && !this._isReady); + } while (this.#isOpen && !this.#isReady); } - private _createSocket(): Promise { + #createSocket(): Promise { return new Promise((resolve, reject) => { - const { connectEvent, socket } = RedisSocket._isTlsSocket(this._options) ? - this._createTlsSocket() : - this._createNetSocket(); + const { connectEvent, socket } = RedisSocket.#isTlsSocket(this.#options) ? + this.#createTlsSocket() : + this.#createNetSocket(); - if (this._options.connectTimeout) { - socket.setTimeout(this._options.connectTimeout, () => socket.destroy(new ConnectionTimeoutError())); + if (this.#options.connectTimeout) { + socket.setTimeout(this.#options.connectTimeout, () => socket.destroy(new ConnectionTimeoutError())); } - if (this._isSocketUnrefed) { + if (this.#isSocketUnrefed) { socket.unref(); } socket - .setNoDelay(this._options.noDelay) + .setNoDelay(this.#options.noDelay) .once('error', reject) .once(connectEvent, () => { socket .setTimeout(0) // https://github.com/nodejs/node/issues/31663 - .setKeepAlive(this._options.keepAlive !== false, this._options.keepAlive || 0) + .setKeepAlive(this.#options.keepAlive !== false, this.#options.keepAlive || 0) .off('error', reject) - .once('error', (err: Error) => this._onSocketError(err)) + .once('error', (err: Error) => this.#onSocketError(err)) .once('close', hadError => { - if (!hadError && this._isOpen && this._socket === socket) { - this._onSocketError(new SocketClosedUnexpectedlyError()); + if (!hadError && this.#isOpen && this.#socket === socket) { + this.#onSocketError(new SocketClosedUnexpectedlyError()); } }) .on('drain', () => this.emit('drain')) @@ -202,93 +202,93 @@ export default class RedisSocket extends EventEmitter { }); } - private _createNetSocket(): CreateSocketReturn { + #createNetSocket(): CreateSocketReturn { return { connectEvent: 'connect', - socket: net.connect(this._options as net.NetConnectOpts) // TODO + socket: net.connect(this.#options as net.NetConnectOpts) // TODO }; } - private _createTlsSocket(): CreateSocketReturn { + #createTlsSocket(): CreateSocketReturn { return { connectEvent: 'secureConnect', - socket: tls.connect(this._options as tls.ConnectionOptions) // TODO + socket: tls.connect(this.#options as tls.ConnectionOptions) // TODO }; } - private _onSocketError(err: Error): void { - const wasReady = this._isReady; - this._isReady = false; + #onSocketError(err: Error): void { + const wasReady = this.#isReady; + this.#isReady = false; this.emit('error', err); - if (!wasReady || !this._isOpen || typeof this._shouldReconnect(0, err) !== 'number') return; + if (!wasReady || !this.#isOpen || typeof this.#shouldReconnect(0, err) !== 'number') return; this.emit('reconnecting'); - this._connect().catch(() => { + this.#connect().catch(() => { // the error was already emitted, silently ignore it }); } write(iterator: IterableIterator>): void { - if (!this._socket) return; + if (!this.#socket) return; - this._socket.cork(); + this.#socket.cork(); for (const args of iterator) { for (const toWrite of args) { - this._socket.write(toWrite); + this.#socket.write(toWrite); } - if (this._socket.writableNeedDrain) break; + if (this.#socket.writableNeedDrain) break; } - this._socket.uncork(); + this.#socket.uncork(); } async quit(fn: () => Promise): Promise { - if (!this._isOpen) { + if (!this.#isOpen) { throw new ClientClosedError(); } - this._isOpen = false; + this.#isOpen = false; const reply = await fn(); this.destroySocket(); return reply; } close() { - if (!this._isOpen) { + if (!this.#isOpen) { throw new ClientClosedError(); } - this._isOpen = false; + this.#isOpen = false; } destroy() { - if (!this._isOpen) { + if (!this.#isOpen) { throw new ClientClosedError(); } - this._isOpen = false; + this.#isOpen = false; this.destroySocket(); } destroySocket() { - this._isReady = false; + this.#isReady = false; - if (this._socket) { - this._socket.destroy(); - this._socket = undefined; + if (this.#socket) { + this.#socket.destroy(); + this.#socket = undefined; } this.emit('end'); } ref(): void { - this._isSocketUnrefed = false; - this._socket?.ref(); + this.#isSocketUnrefed = false; + this.#socket?.ref(); } unref(): void { - this._isSocketUnrefed = true; - this._socket?.unref(); + this.#isSocketUnrefed = true; + this.#socket?.unref(); } } diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 514e780585..dee133176a 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -100,83 +100,83 @@ export default class RedisClusterSlots< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > { - private static _SLOTS = 16384; + static #SLOTS = 16384; - private readonly _options: RedisClusterOptions; - private readonly _clientFactory: ReturnType>; - private readonly _emit: EventEmitter['emit']; - slots = new Array>(RedisClusterSlots._SLOTS); + readonly #options: RedisClusterOptions; + readonly #clientFactory: ReturnType>; + readonly #emit: EventEmitter['emit']; + slots = new Array>(RedisClusterSlots.#SLOTS); masters = new Array>(); replicas = new Array>(); readonly nodeByAddress = new Map | ShardNode>(); pubSubNode?: PubSubNode; - private _isOpen = false; + #isOpen = false; get isOpen() { - return this._isOpen; + return this.#isOpen; } constructor( options: RedisClusterOptions, emit: EventEmitter['emit'] ) { - this._options = options; - this._clientFactory = RedisClient.factory(options); - this._emit = emit; + this.#options = options; + this.#clientFactory = RedisClient.factory(options); + this.#emit = emit; } async connect() { - if (this._isOpen) { + if (this.#isOpen) { throw new Error('Cluster already open'); } - this._isOpen = true; + this.#isOpen = true; try { - await this._discoverWithRootNodes(); + await this.#discoverWithRootNodes(); } catch (err) { - this._isOpen = false; + this.#isOpen = false; throw err; } } - private async _discoverWithRootNodes() { - let start = Math.floor(Math.random() * this._options.rootNodes.length); - for (let i = start; i < this._options.rootNodes.length; i++) { - if (!this._isOpen) throw new Error('Cluster closed'); - if (await this._discover(this._options.rootNodes[i])) return; + async #discoverWithRootNodes() { + let start = Math.floor(Math.random() * this.#options.rootNodes.length); + for (let i = start; i < this.#options.rootNodes.length; i++) { + if (!this.#isOpen) throw new Error('Cluster closed'); + if (await this.#discover(this.#options.rootNodes[i])) return; } for (let i = 0; i < start; i++) { - if (!this._isOpen) throw new Error('Cluster closed'); - if (await this._discover(this._options.rootNodes[i])) return; + if (!this.#isOpen) throw new Error('Cluster closed'); + if (await this.#discover(this.#options.rootNodes[i])) return; } throw new RootNodesUnavailableError(); } - private _resetSlots() { - this.slots = new Array(RedisClusterSlots._SLOTS); + #resetSlots() { + this.slots = new Array(RedisClusterSlots.#SLOTS); this.masters = []; this.replicas = []; this._randomNodeIterator = undefined; } - private async _discover(rootNode: RedisClusterClientOptions) { - this._resetSlots(); + async #discover(rootNode: RedisClusterClientOptions) { + this.#resetSlots(); try { const addressesInUse = new Set(), promises: Array> = [], - eagerConnect = this._options.minimizeConnections !== true; + eagerConnect = this.#options.minimizeConnections !== true; - for (const { from, to, master, replicas } of await this._getShards(rootNode)) { + for (const { from, to, master, replicas } of await this.#getShards(rootNode)) { const shard: Shard = { - master: this._initiateSlotNode(master, false, eagerConnect, addressesInUse, promises) + master: this.#initiateSlotNode(master, false, eagerConnect, addressesInUse, promises) }; - if (this._options.useReplicas) { + if (this.#options.useReplicas) { shard.replicas = replicas.map(replica => - this._initiateSlotNode(replica, true, eagerConnect, addressesInUse, promises) + this.#initiateSlotNode(replica, true, eagerConnect, addressesInUse, promises) ); } @@ -193,7 +193,7 @@ export default class RedisClusterSlots< if (channelsListeners.size || patternsListeners.size) { promises.push( - this._initiatePubSubClient({ + this.#initiatePubSubClient({ [PubSubType.CHANNELS]: channelsListeners, [PubSubType.PATTERNS]: patternsListeners }) @@ -220,21 +220,21 @@ export default class RedisClusterSlots< return true; } catch (err) { - this._emit('error', err); + this.#emit('error', err); return false; } } - private async _getShards(rootNode: RedisClusterClientOptions) { - const options = this._clientOptionsDefaults(rootNode)!; + async #getShards(rootNode: RedisClusterClientOptions) { + const options = this.#clientOptionsDefaults(rootNode)!; options.socket ??= {}; options.socket.reconnectStrategy = false; - options.RESP = this._options.RESP; + options.RESP = this.#options.RESP; options.commandOptions = undefined; // TODO: find a way to avoid type casting - const client = await this._clientFactory(options as RedisClientOptions) - .on('error', err => this._emit('error', err)) + const client = await this.#clientFactory(options as RedisClientOptions) + .on('error', err => this.#emit('error', err)) .connect(); try { @@ -245,37 +245,37 @@ export default class RedisClusterSlots< } } - private _getNodeAddress(address: string): NodeAddress | undefined { - switch (typeof this._options.nodeAddressMap) { + #getNodeAddress(address: string): NodeAddress | undefined { + switch (typeof this.#options.nodeAddressMap) { case 'object': - return this._options.nodeAddressMap[address]; + return this.#options.nodeAddressMap[address]; case 'function': - return this._options.nodeAddressMap(address); + return this.#options.nodeAddressMap(address); } } - private _clientOptionsDefaults(options?: RedisClientOptions) { - if (!this._options.defaults) return options; + #clientOptionsDefaults(options?: RedisClientOptions) { + if (!this.#options.defaults) return options; let socket; - if (this._options.defaults.socket) { + if (this.#options.defaults.socket) { socket = options?.socket ? { - ...this._options.defaults.socket, + ...this.#options.defaults.socket, ...options.socket - } : this._options.defaults.socket; + } : this.#options.defaults.socket; } else { socket = options?.socket; } return { - ...this._options.defaults, + ...this.#options.defaults, ...options, socket }; } - private _initiateSlotNode( + #initiateSlotNode( shard: NodeAddress & { id: string; }, readonly: boolean, eagerConnent: boolean, @@ -295,7 +295,7 @@ export default class RedisClusterSlots< }; if (eagerConnent) { - promises.push(this._createNodeClient(node)); + promises.push(this.#createNodeClient(node)); } this.nodeByAddress.set(address, node); @@ -309,21 +309,21 @@ export default class RedisClusterSlots< return node; } - private _createClient(node: ShardNode, readonly = node.readonly) { - return this._clientFactory( - this._clientOptionsDefaults({ - socket: this._getNodeAddress(node.address) ?? { + #createClient(node: ShardNode, readonly = node.readonly) { + return this.#clientFactory( + this.#clientOptionsDefaults({ + socket: this.#getNodeAddress(node.address) ?? { host: node.host, port: node.port }, readonly, - RESP: this._options.RESP + RESP: this.#options.RESP }) ).on('error', err => console.error(err)); } - private _createNodeClient(node: ShardNode, readonly?: boolean) { - const client = node.client = this._createClient(node, readonly); + #createNodeClient(node: ShardNode, readonly?: boolean) { + const client = node.client = this.#createClient(node, readonly); return node.connectPromise = client.connect() .finally(() => node.connectPromise = undefined); } @@ -332,46 +332,46 @@ export default class RedisClusterSlots< return ( node.connectPromise ?? // if the node is connecting node.client ?? // if the node is connected - this._createNodeClient(node) // if the not is disconnected + this.#createNodeClient(node) // if the not is disconnected ); } - private _runningRediscoverPromise?: Promise; + #runningRediscoverPromise?: Promise; async rediscover(startWith: RedisClientType): Promise { - this._runningRediscoverPromise ??= this._rediscover(startWith) - .finally(() => this._runningRediscoverPromise = undefined); - return this._runningRediscoverPromise; + this.#runningRediscoverPromise ??= this.#rediscover(startWith) + .finally(() => this.#runningRediscoverPromise = undefined); + return this.#runningRediscoverPromise; } - private async _rediscover(startWith: RedisClientType): Promise { - if (await this._discover(startWith.options!)) return; + async #rediscover(startWith: RedisClientType): Promise { + if (await this.#discover(startWith.options!)) return; - return this._discoverWithRootNodes(); + return this.#discoverWithRootNodes(); } /** * @deprecated Use `close` instead. */ quit(): Promise { - return this._destroy(client => client.quit()); + return this.#destroy(client => client.quit()); } /** * @deprecated Use `destroy` instead. */ disconnect(): Promise { - return this._destroy(client => client.disconnect()); + return this.#destroy(client => client.disconnect()); } close() { - return this._destroy(client => client.close()); + return this.#destroy(client => client.close()); } destroy() { - this._isOpen = false; + this.#isOpen = false; - for (const client of this._clients()) { + for (const client of this.#clients()) { client.destroy(); } @@ -380,11 +380,11 @@ export default class RedisClusterSlots< this.pubSubNode = undefined; } - this._resetSlots(); + this.#resetSlots(); this.nodeByAddress.clear(); } - private *_clients() { + *#clients() { for (const master of this.masters) { if (master.client) { yield master.client; @@ -402,11 +402,11 @@ export default class RedisClusterSlots< } } - private async _destroy(fn: (client: RedisClientType) => Promise): Promise { - this._isOpen = false; + async #destroy(fn: (client: RedisClientType) => Promise): Promise { + this.#isOpen = false; const promises = []; - for (const client of this._clients()) { + for (const client of this.#clients()) { promises.push(fn(client)); } @@ -415,7 +415,7 @@ export default class RedisClusterSlots< this.pubSubNode = undefined; } - this._resetSlots(); + this.#resetSlots(); this.nodeByAddress.clear(); await Promise.allSettled(promises); @@ -437,7 +437,7 @@ export default class RedisClusterSlots< return this.nodeClient(this.getSlotRandomNode(slotNumber)); } - private *_iterateAllNodes() { + *#iterateAllNodes() { let i = Math.floor(Math.random() * (this.masters.length + this.replicas.length)); if (i < this.masters.length) { do { @@ -468,11 +468,11 @@ export default class RedisClusterSlots< _randomNodeIterator?: IterableIterator>; getRandomNode() { - this._randomNodeIterator ??= this._iterateAllNodes(); + this._randomNodeIterator ??= this.#iterateAllNodes(); return this._randomNodeIterator.next().value as ShardNode; } - private *_slotNodesIterator(slot: ShardWithReplicas) { + *#slotNodesIterator(slot: ShardWithReplicas) { let i = Math.floor(Math.random() * (1 + slot.replicas.length)); if (i < slot.replicas.length) { do { @@ -495,7 +495,7 @@ export default class RedisClusterSlots< return slot.master; } - slot.nodesIterator ??= this._slotNodesIterator(slot as ShardWithReplicas); + slot.nodesIterator ??= this.#slotNodesIterator(slot as ShardWithReplicas); return slot.nodesIterator.next().value as ShardNode; } @@ -507,17 +507,17 @@ export default class RedisClusterSlots< } getPubSubClient() { - if (!this.pubSubNode) return this._initiatePubSubClient(); + if (!this.pubSubNode) return this.#initiatePubSubClient(); return this.pubSubNode.connectPromise ?? this.pubSubNode.client; } - private async _initiatePubSubClient(toResubscribe?: PubSubToResubscribe) { + async #initiatePubSubClient(toResubscribe?: PubSubToResubscribe) { const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)), node = index < this.masters.length ? this.masters[index] : this.replicas[index - this.masters.length], - client = this._createClient(node, true); + client = this.#createClient(node, true); this.pubSubNode = { address: node.address, @@ -557,12 +557,12 @@ export default class RedisClusterSlots< getShardedPubSubClient(channel: string) { const { master } = this.slots[calculateSlot(channel)]; - if (!master.pubSub) return this._initiateShardedPubSubClient(master); + if (!master.pubSub) return this.#initiateShardedPubSubClient(master); return master.pubSub.connectPromise ?? master.pubSub.client; } - private async _initiateShardedPubSubClient(master: MasterNode) { - const client = this._createClient(master, true) + async #initiateShardedPubSubClient(master: MasterNode) { + const client = this.#createClient(master, true) .on('server-sunsubscribe', async (channel, listeners) => { try { await this.rediscover(client); @@ -573,7 +573,7 @@ export default class RedisClusterSlots< listeners ); } catch (err) { - this._emit('sharded-shannel-moved-error', err, channel, listeners); + this.#emit('sharded-shannel-moved-error', err, channel, listeners); } }); diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index a350eb37f5..c2987a6915 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -132,7 +132,7 @@ export interface ClusterCommandOptions< type ProxyCluster = RedisCluster; -type NamespaceProxyCluster = { self: ProxyCluster }; +type NamespaceProxyCluster = { _self: ProxyCluster }; export default class RedisCluster< M extends RedisModules, @@ -166,7 +166,7 @@ export default class RedisCluster< return key; } - private static _createCommand(command: Command, resp: RespVersions) { + static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: ProxyCluster, ...args: Array) { const redisArgs = command.transformArguments(...args), @@ -189,7 +189,7 @@ export default class RedisCluster< }; } - private static _createModuleCommand(command: Command, resp: RespVersions) { + static #createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: NamespaceProxyCluster, ...args: Array) { const redisArgs = command.transformArguments(...args), @@ -198,11 +198,11 @@ export default class RedisCluster< args, redisArgs ), - reply = await this.self.sendCommand( + reply = await this._self.sendCommand( firstKey, command.IS_READ_ONLY, redisArgs, - this.self._commandOptions, + this._self._commandOptions, // command.POLICIES ); @@ -212,7 +212,7 @@ export default class RedisCluster< }; } - private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { + static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); return async function (this: NamespaceProxyCluster, ...args: Array) { @@ -223,11 +223,11 @@ export default class RedisCluster< fnArgs ), redisArgs = prefix.concat(fnArgs), - reply = await this.self.sendCommand( + reply = await this._self.sendCommand( firstKey, fn.IS_READ_ONLY, redisArgs, - this.self._commandOptions, + this._self._commandOptions, // fn.POLICIES ); @@ -237,7 +237,7 @@ export default class RedisCluster< }; } - private static _createScriptCommand(script: RedisScript, resp: RespVersions) { + static #createScriptCommand(script: RedisScript, resp: RespVersions) { const prefix = scriptArgumentsPrefix(script), transformReply = getTransformReply(script, resp); return async function (this: ProxyCluster, ...args: Array) { @@ -274,17 +274,17 @@ export default class RedisCluster< const Cluster = attachConfig({ BaseClass: RedisCluster, commands: COMMANDS, - createCommand: RedisCluster._createCommand, - createModuleCommand: RedisCluster._createModuleCommand, - createFunctionCommand: RedisCluster._createFunctionCommand, - createScriptCommand: RedisCluster._createScriptCommand, + createCommand: RedisCluster.#createCommand, + createModuleCommand: RedisCluster.#createModuleCommand, + createFunctionCommand: RedisCluster.#createFunctionCommand, + createScriptCommand: RedisCluster.#createScriptCommand, config }); Cluster.prototype.Multi = RedisClusterMultiCommand.extend(config); return (options?: Omit>) => { - // returning a "proxy" to prevent the namespaces.self to leak between "proxies" + // returning a "proxy" to prevent the namespaces._self to leak between "proxies" return Object.create(new Cluster(options)) as RedisClusterType; }; } @@ -300,10 +300,11 @@ export default class RedisCluster< return RedisCluster.factory(options)(options); } - private readonly _options: RedisClusterOptions; + readonly #options: RedisClusterOptions; - private readonly _slots: RedisClusterSlots; + readonly #slots: RedisClusterSlots; + private _self = this; private _commandOptions?: ClusterCommandOptions; /** @@ -311,7 +312,7 @@ export default class RedisCluster< * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica). */ get slots() { - return this._slots.slots; + return this._self.#slots.slots; } /** @@ -319,7 +320,7 @@ export default class RedisCluster< * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node. */ get masters() { - return this._slots.masters; + return this._self.#slots.masters; } /** @@ -327,7 +328,7 @@ export default class RedisCluster< * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific replica node. */ get replicas() { - return this._slots.replicas; + return this._self.#slots.replicas; } /** @@ -335,25 +336,25 @@ export default class RedisCluster< * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica). */ get nodeByAddress() { - return this._slots.nodeByAddress; + return this._self.#slots.nodeByAddress; } /** * The current pub/sub node. */ get pubSubNode() { - return this._slots.pubSubNode; + return this._self.#slots.pubSubNode; } get isOpen() { - return this._slots.isOpen; + return this._self.#slots.isOpen; } constructor(options: RedisClusterOptions) { super(); - this._options = options; - this._slots = new RedisClusterSlots(options, this.emit.bind(this)); + this.#options = options; + this.#slots = new RedisClusterSlots(options, this.emit.bind(this)); if (options?.commandOptions) { this._commandOptions = options.commandOptions; @@ -368,14 +369,14 @@ export default class RedisCluster< _TYPE_MAPPING extends TypeMapping = TYPE_MAPPING >(overrides?: Partial>) { return new (Object.getPrototypeOf(this).constructor)({ - ...this._options, + ...this._self.#options, commandOptions: this._commandOptions, ...overrides }) as RedisClusterType<_M, _F, _S, _RESP, _TYPE_MAPPING>; } connect() { - return this._slots.connect(); + return this._self.#slots.connect(); } withCommandOptions< @@ -430,13 +431,13 @@ export default class RedisCluster< // return this._commandOptionsProxy('policies', policies); // } - private async _execute( + async #execute( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, fn: (client: RedisClientType) => Promise ): Promise { - const maxCommandRedirections = this._options.maxCommandRedirections ?? 16; - let client = await this._slots.getClient(firstKey, isReadonly), + const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; + let client = await this.#slots.getClient(firstKey, isReadonly), i = 0; while (true) { try { @@ -449,10 +450,10 @@ export default class RedisCluster< if (err.message.startsWith('ASK')) { const address = err.message.substring(err.message.lastIndexOf(' ') + 1); - let redirectTo = await this._slots.getMasterByAddress(address); + let redirectTo = await this.#slots.getMasterByAddress(address); if (!redirectTo) { - await this._slots.rediscover(client); - redirectTo = await this._slots.getMasterByAddress(address); + await this.#slots.rediscover(client); + redirectTo = await this.#slots.getMasterByAddress(address); } if (!redirectTo) { @@ -465,8 +466,8 @@ export default class RedisCluster< } if (err.message.startsWith('MOVED')) { - await this._slots.rediscover(client); - client = await this._slots.getClient(firstKey, isReadonly); + await this.#slots.rediscover(client); + client = await this.#slots.getClient(firstKey, isReadonly); continue; } @@ -482,7 +483,7 @@ export default class RedisCluster< options?: ClusterCommandOptions, // defaultPolicies?: CommandPolicies ): Promise { - return this._execute( + return this._self.#execute( firstKey, isReadonly, client => client.sendCommand(args, options) @@ -496,7 +497,7 @@ export default class RedisCluster< args: Array, options?: CommandOptions ) { - return this._execute( + return this._self.#execute( firstKey, isReadonly, client => client.executeScript(script, args, options) @@ -507,11 +508,11 @@ export default class RedisCluster< type Multi = new (...args: ConstructorParameters) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>; return new ((this as any).Multi as Multi)( async (firstKey, isReadonly, commands) => { - const client = await this._slots.getClient(firstKey, isReadonly); + const client = await this._self.#slots.getClient(firstKey, isReadonly); return client._executeMulti(commands); }, async (firstKey, isReadonly, commands) => { - const client = await this._slots.getClient(firstKey, isReadonly); + const client = await this._self.#slots.getClient(firstKey, isReadonly); return client._executePipeline(commands); }, routing @@ -525,7 +526,7 @@ export default class RedisCluster< listener: PubSubListener, bufferMode?: T ) { - return (await this._slots.getPubSubClient()) + return (await this._self.#slots.getPubSubClient()) .SUBSCRIBE(channels, listener, bufferMode); } @@ -536,7 +537,7 @@ export default class RedisCluster< listener?: PubSubListener, bufferMode?: T ) { - return this._slots.executeUnsubscribeCommand(client => + return this._self.#slots.executeUnsubscribeCommand(client => client.UNSUBSCRIBE(channels, listener, bufferMode) ); } @@ -548,7 +549,7 @@ export default class RedisCluster< listener: PubSubListener, bufferMode?: T ) { - return (await this._slots.getPubSubClient()) + return (await this._self.#slots.getPubSubClient()) .PSUBSCRIBE(patterns, listener, bufferMode); } @@ -559,7 +560,7 @@ export default class RedisCluster< listener?: PubSubListener, bufferMode?: T ) { - return this._slots.executeUnsubscribeCommand(client => + return this._self.#slots.executeUnsubscribeCommand(client => client.PUNSUBSCRIBE(patterns, listener, bufferMode) ); } @@ -571,9 +572,9 @@ export default class RedisCluster< listener: PubSubListener, bufferMode?: T ) { - const maxCommandRedirections = this._options.maxCommandRedirections ?? 16, + const maxCommandRedirections = this._self.#options.maxCommandRedirections ?? 16, firstChannel = Array.isArray(channels) ? channels[0] : channels; - let client = await this._slots.getShardedPubSubClient(firstChannel); + let client = await this._self.#slots.getShardedPubSubClient(firstChannel); for (let i = 0; ; i++) { try { return await client.SSUBSCRIBE(channels, listener, bufferMode); @@ -583,8 +584,8 @@ export default class RedisCluster< } if (err.message.startsWith('MOVED')) { - await this._slots.rediscover(client); - client = await this._slots.getShardedPubSubClient(firstChannel); + await this._self.#slots.rediscover(client); + client = await this._self.#slots.getShardedPubSubClient(firstChannel); continue; } @@ -600,7 +601,7 @@ export default class RedisCluster< listener: PubSubListener, bufferMode?: T ) { - return this._slots.executeShardedUnsubscribeCommand( + return this._self.#slots.executeShardedUnsubscribeCommand( Array.isArray(channels) ? channels[0] : channels, client => client.SUNSUBSCRIBE(channels, listener, bufferMode) ); @@ -612,26 +613,26 @@ export default class RedisCluster< * @deprecated Use `close` instead. */ quit() { - return this._slots.quit(); + return this._self.#slots.quit(); } /** * @deprecated Use `destroy` instead. */ disconnect() { - return this._slots.disconnect(); + return this._self.#slots.disconnect(); } close() { - return this._slots.close(); + return this._self.#slots.close(); } destroy() { - return this._slots.destroy(); + return this._self.#slots.destroy(); } nodeClient(node: ShardNode) { - return this._slots.nodeClient(node); + return this._self.#slots.nodeClient(node); } /** @@ -639,7 +640,7 @@ export default class RedisCluster< * Userful for running "forward" commands (like PUBLISH) on a random node. */ getRandomNode() { - return this._slots.getRandomNode(); + return this._self.#slots.getRandomNode(); } /** @@ -647,7 +648,7 @@ export default class RedisCluster< * Useful for running readonly commands on a slot. */ getSlotRandomNode(slot: number) { - return this._slots.getSlotRandomNode(slot); + return this._self.#slots.getSlotRandomNode(slot); } /** diff --git a/packages/client/lib/cluster/multi-command.ts b/packages/client/lib/cluster/multi-command.ts index 531c00f1d5..225d162465 100644 --- a/packages/client/lib/cluster/multi-command.ts +++ b/packages/client/lib/cluster/multi-command.ts @@ -91,7 +91,7 @@ export type ClusterMultiExecute = ( ) => Promise>; export default class RedisClusterMultiCommand { - private static _createCommand(command: Command, resp: RespVersions) { + static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return function (this: RedisClusterMultiCommand, ...args: Array) { const redisArgs = command.transformArguments(...args), @@ -109,16 +109,16 @@ export default class RedisClusterMultiCommand { }; } - private static _createModuleCommand(command: Command, resp: RespVersions) { + static #createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); - return function (this: { self: RedisClusterMultiCommand }, ...args: Array) { + return function (this: { _self: RedisClusterMultiCommand }, ...args: Array) { const redisArgs = command.transformArguments(...args), firstKey = RedisCluster.extractFirstKey( command, args, redisArgs ); - return this.self.addCommand( + return this._self.addCommand( firstKey, command.IS_READ_ONLY, redisArgs, @@ -127,10 +127,10 @@ export default class RedisClusterMultiCommand { }; } - private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { + static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); - return function (this: { self: RedisClusterMultiCommand }, ...args: Array) { + return function (this: { _self: RedisClusterMultiCommand }, ...args: Array) { const fnArgs = fn.transformArguments(...args), redisArgs: CommandArguments = prefix.concat(fnArgs), firstKey = RedisCluster.extractFirstKey( @@ -139,7 +139,7 @@ export default class RedisClusterMultiCommand { fnArgs ); redisArgs.preserve = fnArgs.preserve; - return this.self.addCommand( + return this._self.addCommand( firstKey, fn.IS_READ_ONLY, redisArgs, @@ -148,11 +148,11 @@ export default class RedisClusterMultiCommand { }; } - private static _createScriptCommand(script: RedisScript, resp: RespVersions) { + static #createScriptCommand(script: RedisScript, resp: RespVersions) { const transformReply = getTransformReply(script, resp); return function (this: RedisClusterMultiCommand, ...args: Array) { const scriptArgs = script.transformArguments(...args); - this._setState( + this.#setState( RedisCluster.extractFirstKey( script, args, @@ -160,7 +160,7 @@ export default class RedisClusterMultiCommand { ), script.IS_READ_ONLY ); - this._multi.addScript( + this.#multi.addScript( script, scriptArgs, transformReply @@ -178,36 +178,36 @@ export default class RedisClusterMultiCommand { return attachConfig({ BaseClass: RedisClusterMultiCommand, commands: COMMANDS, - createCommand: RedisClusterMultiCommand._createCommand, - createModuleCommand: RedisClusterMultiCommand._createModuleCommand, - createFunctionCommand: RedisClusterMultiCommand._createFunctionCommand, - createScriptCommand: RedisClusterMultiCommand._createScriptCommand, + createCommand: RedisClusterMultiCommand.#createCommand, + createModuleCommand: RedisClusterMultiCommand.#createModuleCommand, + createFunctionCommand: RedisClusterMultiCommand.#createFunctionCommand, + createScriptCommand: RedisClusterMultiCommand.#createScriptCommand, config }); } - private readonly _multi = new RedisMultiCommand(); - private readonly _executeMulti: ClusterMultiExecute; - private readonly _executePipeline: ClusterMultiExecute; - private _firstKey: RedisArgument | undefined; - private _isReadonly: boolean | undefined = true; + readonly #multi = new RedisMultiCommand(); + readonly #executeMulti: ClusterMultiExecute; + readonly #executePipeline: ClusterMultiExecute; + #firstKey: RedisArgument | undefined; + #isReadonly: boolean | undefined = true; constructor( executeMulti: ClusterMultiExecute, executePipeline: ClusterMultiExecute, routing: RedisArgument | undefined ) { - this._executeMulti = executeMulti; - this._executePipeline = executePipeline; - this._firstKey = routing; + this.#executeMulti = executeMulti; + this.#executePipeline = executePipeline; + this.#firstKey = routing; } - private _setState( + #setState( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, ) { - this._firstKey ??= firstKey; - this._isReadonly &&= isReadonly; + this.#firstKey ??= firstKey; + this.#isReadonly &&= isReadonly; } addCommand( @@ -216,19 +216,19 @@ export default class RedisClusterMultiCommand { args: CommandArguments, transformReply?: TransformReply ) { - this._setState(firstKey, isReadonly); - this._multi.addCommand(args, transformReply); + this.#setState(firstKey, isReadonly); + this.#multi.addCommand(args, transformReply); return this; } async exec(execAsPipeline = false) { if (execAsPipeline) return this.execAsPipeline(); - return this._multi.transformReplies( - await this._executeMulti( - this._firstKey, - this._isReadonly, - this._multi.queue + return this.#multi.transformReplies( + await this.#executeMulti( + this.#firstKey, + this.#isReadonly, + this.#multi.queue ) ) as MultiReplyType; } @@ -240,13 +240,13 @@ export default class RedisClusterMultiCommand { } async execAsPipeline() { - if (this._multi.queue.length === 0) return [] as MultiReplyType; + if (this.#multi.queue.length === 0) return [] as MultiReplyType; - return this._multi.transformReplies( - await this._executePipeline( - this._firstKey, - this._isReadonly, - this._multi.queue + return this.#multi.transformReplies( + await this.#executePipeline( + this.#firstKey, + this.#isReadonly, + this.#multi.queue ) ) as MultiReplyType; } diff --git a/packages/client/lib/commander.ts b/packages/client/lib/commander.ts index b1db7e701f..d96aaa7128 100644 --- a/packages/client/lib/commander.ts +++ b/packages/client/lib/commander.ts @@ -71,7 +71,7 @@ function attachNamespace(prototype: any, name: PropertyKey, fns: any) { Object.defineProperty(prototype, name, { get() { const value = Object.create(fns); - value.self = this; + value._self = this; Object.defineProperty(this, name, { value }); return value; }