From dc094eaa9a6b66df66423796e42b68edb4b05336 Mon Sep 17 00:00:00 2001 From: Leibale Date: Wed, 24 May 2023 13:59:53 +0300 Subject: [PATCH] WIP --- packages/client/lib/RESP/decoder.ts | 1 + packages/client/lib/RESP/encoder.ts | 2 +- packages/client/lib/client/commands-queue.ts | 67 +++++--- packages/client/lib/client/index.ts | 36 ++-- .../client/lib/client/linked-list.spec.ts | 138 +++++++++++++++ packages/client/lib/client/linked-list.ts | 162 ++++++++++++++++++ packages/client/lib/client/pub-sub.ts | 2 +- packages/client/lib/client/queue.spec.ts | 77 --------- packages/client/lib/client/queue.ts | 101 ----------- packages/client/lib/client/socket.ts | 156 ++++++++--------- packages/client/lib/commands/CLUSTER_MYID.ts | 2 +- 11 files changed, 441 insertions(+), 303 deletions(-) create mode 100644 packages/client/lib/client/linked-list.spec.ts create mode 100644 packages/client/lib/client/linked-list.ts delete mode 100644 packages/client/lib/client/queue.spec.ts delete mode 100644 packages/client/lib/client/queue.ts diff --git a/packages/client/lib/RESP/decoder.ts b/packages/client/lib/RESP/decoder.ts index 2a0a30b474..98cd7bcdc8 100644 --- a/packages/client/lib/RESP/decoder.ts +++ b/packages/client/lib/RESP/decoder.ts @@ -555,6 +555,7 @@ export class Decoder { this, length - slice.length, chunks, + skip, flag ); } diff --git a/packages/client/lib/RESP/encoder.ts b/packages/client/lib/RESP/encoder.ts index b1db1c7023..af857711dc 100644 --- a/packages/client/lib/RESP/encoder.ts +++ b/packages/client/lib/RESP/encoder.ts @@ -1,4 +1,4 @@ -import { RedisArgument } from "./types"; +import { RedisArgument } from './types'; const CRLF = '\r\n'; diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 5ec181d78b..55efe68080 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,4 +1,4 @@ -import Queue, { QueueNode } from './queue'; +import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_FLAGS, RESP_TYPES } from '../RESP/decoder'; import { CommandArguments, Flags, ReplyUnion, RespVersions } from '../RESP/types'; @@ -7,16 +7,19 @@ import { AbortError, ErrorReply } from '../errors'; import { EventEmitter } from 'stream'; export interface QueueCommandOptions { - asap?: boolean; chainId?: symbol; - signal?: AbortSignal; + asap?: boolean; + abortSignal?: AbortSignal; flags?: Flags; } export interface CommandWaitingToBeSent extends CommandWaitingForReply { args: CommandArguments; chainId?: symbol; - removeAbortListener?(): void; + abort?: { + signal: AbortSignal; + listener: () => unknown; + }; } interface CommandWaitingForReply { @@ -37,8 +40,8 @@ const RESP2_PUSH_FLAGS = { export default class RedisCommandsQueue { private readonly _maxLength: number | null | undefined; - private readonly _waitingToBeSent = new Queue(); - private readonly _waitingForReply = new Queue(); + private readonly _waitingToBeSent = new DoublyLinkedList(); + private readonly _waitingForReply = new SinglyLinkedList(); private readonly _onShardedChannelMoved: OnShardedChannelMoved; private readonly _pubSub = new PubSub(); @@ -149,30 +152,31 @@ export default class RedisCommandsQueue { addCommand(args: CommandArguments, options?: QueueCommandOptions): Promise { if (this._maxLength && this._waitingToBeSent.length + this._waitingForReply.length >= this._maxLength) { return Promise.reject(new Error('The queue is full')); - } else if (options?.signal?.aborted) { + } else if (options?.abortSignal?.aborted) { return Promise.reject(new AbortError()); } return new Promise((resolve, reject) => { - let node: QueueNode; + let node: DoublyLinkedNode; const value: CommandWaitingToBeSent = { args, chainId: options?.chainId, flags: options?.flags, resolve, reject, - removeAbortListener: undefined + abort: undefined }; - const signal = options?.signal; + const signal = options?.abortSignal; if (signal) { - const listener = () => { - this._waitingToBeSent.remove(node); - value.reject(new AbortError()); + value.abort = { + signal, + listener: () => { + this._waitingToBeSent.remove(node); + value.reject(new AbortError()); + } }; - - value.removeAbortListener = () => signal.removeEventListener('abort', listener); - signal.addEventListener('abort', listener, { once: true }); + signal.addEventListener('abort', value.abort.listener, { once: true }); } node = options?.asap ? @@ -264,13 +268,15 @@ export default class RedisCommandsQueue { return; } - // TODO - // reuse `toSend` - (toSend.args as any) = undefined; - if (toSend.removeAbortListener) { - toSend.removeAbortListener(); - (toSend.removeAbortListener as any) = undefined; + if (toSend.abort) { + RedisCommandsQueue._removeAbortListener(toSend); + toSend.abort = undefined; } + + // TODO reuse `toSend` or create new object? + (toSend as any).args = undefined; + (toSend as any).chainId = undefined; + this._waitingForReply.push(toSend); this._chainInExecution = toSend.chainId; return encoded; @@ -282,9 +288,16 @@ export default class RedisCommandsQueue { } } - static #flushWaitingToBeSent(command: CommandWaitingToBeSent, err: Error) { - command.removeAbortListener?.(); - command.reject(err); + private static _removeAbortListener(command: CommandWaitingToBeSent) { + command.abort!.signal.removeEventListener('abort', command.abort!.listener); + } + + private static _flushWaitingToBeSent(toBeSent: CommandWaitingToBeSent, err: Error) { + if (toBeSent.abort) { + RedisCommandsQueue._removeAbortListener(toBeSent); + } + + toBeSent.reject(err); } flushWaitingForReply(err: Error): void { @@ -296,7 +309,7 @@ export default class RedisCommandsQueue { if (!this._chainInExecution) return; while (this._waitingToBeSent.head?.value.chainId === this._chainInExecution) { - RedisCommandsQueue.#flushWaitingToBeSent( + RedisCommandsQueue._flushWaitingToBeSent( this._waitingToBeSent.shift()!, err ); @@ -310,7 +323,7 @@ export default class RedisCommandsQueue { this._pubSub.reset(); this.#flushWaitingForReply(err); while (this._waitingToBeSent.head) { - RedisCommandsQueue.#flushWaitingToBeSent( + RedisCommandsQueue._flushWaitingToBeSent( this._waitingToBeSent.shift()!, err ); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 6de3d53e52..c098eb7208 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -137,22 +137,22 @@ export default class RedisClient< > extends EventEmitter { private static _createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); - return async function (this: ProxyClient) { - const args = command.transformArguments.apply(undefined, arguments as any), - reply = await this.sendCommand(args, this.commandOptions); + return async function (this: ProxyClient, ...args: Array) { + const redisArgs = command.transformArguments(...args), + reply = await this.sendCommand(redisArgs, this.commandOptions); return transformReply ? - transformReply(reply, args.preserve) : + transformReply(reply, redisArgs.preserve) : reply; }; } private static _createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); - return async function (this: NamespaceProxyClient) { - const args = command.transformArguments.apply(undefined, arguments as any), - reply = await this.self.sendCommand(args, this.self.commandOptions); + return async function (this: NamespaceProxyClient, ...args: Array) { + const redisArgs = command.transformArguments(...args), + reply = await this.self.sendCommand(redisArgs, this.self.commandOptions); return transformReply ? - transformReply(reply, args.preserve) : + transformReply(reply, redisArgs.preserve) : reply; }; } @@ -160,8 +160,8 @@ export default class RedisClient< private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); - return async function (this: NamespaceProxyClient) { - const fnArgs = fn.transformArguments.apply(undefined, arguments as any), + return async function (this: NamespaceProxyClient, ...args: Array) { + const fnArgs = fn.transformArguments(...args), reply = await this.self.sendCommand( prefix.concat(fnArgs), this.self.commandOptions @@ -175,15 +175,15 @@ export default class RedisClient< private static _createScriptCommand(script: RedisScript, resp: RespVersions) { const prefix = scriptArgumentsPrefix(script), transformReply = getTransformReply(script, resp); - return async function (this: ProxyClient) { - const scriptArgs = script.transformArguments.apply(undefined, arguments as any), - args = prefix.concat(scriptArgs), - reply = await this.sendCommand(args, this.commandOptions).catch((err: unknown) => { + return async function (this: ProxyClient, ...args: Array) { + const scriptArgs = script.transformArguments(...args), + redisArgs = prefix.concat(scriptArgs), + reply = await this.sendCommand(redisArgs, this.commandOptions).catch((err: unknown) => { if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err; args[0] = 'EVAL'; args[1] = script.SCRIPT; - return this.sendCommand(args, this.commandOptions); + return this.sendCommand(redisArgs, this.commandOptions); }); return transformReply ? transformReply(reply, scriptArgs.preserve) : @@ -470,6 +470,10 @@ export default class RedisClient< return this._commandOptionsProxy('flags', flags); } + withAbortSignal(abortSignal: AbortSignal) { + return this._commandOptionsProxy('abortSignal', abortSignal); + } + /** * Override the `asap` command option to `true` */ @@ -792,8 +796,6 @@ export default class RedisClient< return Promise.resolve(this.destroy()); } - private _resolveClose?: () => unknown; - /** * Close the client. Wait for pending replies. */ diff --git a/packages/client/lib/client/linked-list.spec.ts b/packages/client/lib/client/linked-list.spec.ts new file mode 100644 index 0000000000..c5ab379b82 --- /dev/null +++ b/packages/client/lib/client/linked-list.spec.ts @@ -0,0 +1,138 @@ +import { SinglyLinkedList, DoublyLinkedList } from './linked-list'; +import { equal, deepEqual } from 'assert/strict'; + +describe.only('DoublyLinkedList', () => { + const list = new DoublyLinkedList(); + + it('should start empty', () => { + equal(list.length, 0); + equal(list.head, undefined); + equal(list.tail, undefined); + deepEqual(Array.from(list), []); + }); + + it('shift empty', () => { + equal(list.shift(), undefined); + equal(list.length, 0); + deepEqual(Array.from(list), []); + }); + + it('push 1', () => { + list.push(1); + equal(list.length, 1); + deepEqual(Array.from(list), [1]); + }); + + it('push 2', () => { + list.push(2); + equal(list.length, 2); + deepEqual(Array.from(list), [1, 2]); + }); + + it('unshift 0', () => { + list.unshift(0); + equal(list.length, 3); + deepEqual(Array.from(list), [0, 1, 2]); + }); + + it('remove middle node', () => { + list.remove(list.head!.next!); + equal(list.length, 2); + deepEqual(Array.from(list), [0, 2]); + }); + + it('remove head', () => { + list.remove(list.head!); + equal(list.length, 1); + deepEqual(Array.from(list), [2]); + }); + + it('remove tail', () => { + list.remove(list.tail!); + equal(list.length, 0); + deepEqual(Array.from(list), []); + }); + + it('unshift empty queue', () => { + list.unshift(0); + equal(list.length, 1); + deepEqual(Array.from(list), [0]); + }); + + it('push 1', () => { + list.push(1); + equal(list.length, 2); + deepEqual(Array.from(list), [0, 1]); + }); + + it('shift', () => { + equal(list.shift(), 0); + equal(list.length, 1); + deepEqual(Array.from(list), [1]); + }); + + it('shift last element', () => { + equal(list.shift(), 1); + equal(list.length, 0); + deepEqual(Array.from(list), []); + }); +}); + +describe.only('SinglyLinkedList', () => { + const list = new SinglyLinkedList(); + + it('should start empty', () => { + equal(list.length, 0); + equal(list.head, undefined); + equal(list.tail, undefined); + deepEqual(Array.from(list), []); + }); + + it('shift empty', () => { + equal(list.shift(), undefined); + equal(list.length, 0); + deepEqual(Array.from(list), []); + }); + + it('push 1', () => { + list.push(1); + equal(list.length, 1); + deepEqual(Array.from(list), [1]); + }); + + it('push 2', () => { + list.push(2); + equal(list.length, 2); + deepEqual(Array.from(list), [1, 2]); + }); + + it('push 3', () => { + list.push(3); + equal(list.length, 3); + deepEqual(Array.from(list), [1, 2, 3]); + }); + + it('shift 1', () => { + equal(list.shift(), 1); + equal(list.length, 2); + deepEqual(Array.from(list), [2, 3]); + }); + + it('shift 2', () => { + equal(list.shift(), 2); + equal(list.length, 1); + deepEqual(Array.from(list), [3]); + }); + + it('shift 3', () => { + equal(list.shift(), 3); + equal(list.length, 0); + deepEqual(Array.from(list), []); + }); + + it('should be empty', () => { + equal(list.length, 0); + equal(list.head, undefined); + equal(list.tail, undefined); + }); +}); diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts new file mode 100644 index 0000000000..ec59e67224 --- /dev/null +++ b/packages/client/lib/client/linked-list.ts @@ -0,0 +1,162 @@ +export interface DoublyLinkedNode { + value: T; + previous: DoublyLinkedNode | undefined; + next: DoublyLinkedNode | undefined; +} + +export class DoublyLinkedList { + private _length = 0; + + get length() { + return this._length; + } + + private _head?: DoublyLinkedNode; + + get head() { + return this._head; + } + + private _tail?: DoublyLinkedNode; + + get tail() { + return this._tail; + } + + push(value: T) { + ++this._length; + + if (this._tail === undefined) { + return this._tail = this._head = { + previous: this._head, + next: undefined, + value + }; + } + + return this._tail = this._tail.next = { + previous: this._tail, + next: undefined, + value + }; + } + + unshift(value: T) { + ++this._length; + + if (this._head === undefined) { + return this._head = this._tail = { + previous: undefined, + next: undefined, + value + }; + } + + return this._head = this._head.previous = { + previous: undefined, + next: this._head, + value + }; + } + + shift() { + if (this._head === undefined) return undefined; + + --this._length; + const node = this._head; + if (node.next) { + node.next.previous = node.previous; + this._head = node.next; + node.next = undefined; + } else { + this._head = this._tail = undefined; + } + return node.value; + } + + remove(node: DoublyLinkedNode) { + --this._length; + + if (this._tail === node) { + this._tail = node.previous; + } + + if (this._head === node) { + this._head = node.next; + } else { + node.previous!.next = node.next; + node.previous = undefined; + } + + node.next = undefined; + } + + *[Symbol.iterator]() { + let node = this._head; + while (node !== undefined) { + yield node.value; + node = node.next; + } + } +} + +export interface SinglyLinkedNode { + value: T; + next: SinglyLinkedNode | undefined; +} + +export class SinglyLinkedList { + private _length = 0; + + get length() { + return this._length; + } + + private _head?: SinglyLinkedNode; + + get head() { + return this._head; + } + + private _tail?: SinglyLinkedNode; + + get tail() { + return this._tail; + } + + push(value: T) { + ++this._length; + + const node = { + value, + next: undefined + }; + + if (this._head === undefined) { + this._head = this._tail = node; + } else { + this._tail!.next = this._tail = node; + } + } + + shift() { + if (this._head === undefined) return undefined; + + const node = this._head; + if (--this._length === 0) { + this._head = this._tail = undefined; + } else { + this._head = node.next; + } + + return node.value; + } + + *[Symbol.iterator]() { + let node = this._head; + while (node !== undefined) { + yield node.value; + node = node.next; + } + } +} diff --git a/packages/client/lib/client/pub-sub.ts b/packages/client/lib/client/pub-sub.ts index afa7972e83..91c771a57e 100644 --- a/packages/client/lib/client/pub-sub.ts +++ b/packages/client/lib/client/pub-sub.ts @@ -1,4 +1,4 @@ -import { RedisArgument } from "../RESP/types"; +import { RedisArgument } from '../RESP/types'; export enum PubSubType { CHANNELS = 'CHANNELS', diff --git a/packages/client/lib/client/queue.spec.ts b/packages/client/lib/client/queue.spec.ts deleted file mode 100644 index 39691c83d9..0000000000 --- a/packages/client/lib/client/queue.spec.ts +++ /dev/null @@ -1,77 +0,0 @@ -import Queue from './queue'; -import { equal, deepEqual } from 'assert/strict'; - -describe('Queue', () => { - const queue = new Queue(); - - it('should start empty', () => { - equal(queue.length, 0); - deepEqual(Array.from(queue), []); - }); - - it('shift empty', () => { - equal(queue.shift(), null); - equal(queue.length, 0); - deepEqual(Array.from(queue), []); - }); - - it('push 1', () => { - queue.push(1); - equal(queue.length, 1); - deepEqual(Array.from(queue), [1]); - }); - - it('push 2', () => { - queue.push(2); - equal(queue.length, 2); - deepEqual(Array.from(queue), [1, 2]); - }); - - it('unshift 0', () => { - queue.unshift(0); - equal(queue.length, 3); - deepEqual(Array.from(queue), [0, 1, 2]); - }); - - it('remove middle node', () => { - queue.remove(queue.head.next!); - equal(queue.length, 2); - deepEqual(Array.from(queue), [0, 2]); - }); - - it('remove head', () => { - queue.remove(queue.head); - equal(queue.length, 1); - deepEqual(Array.from(queue), [2]); - }); - - it('remove tail', () => { - queue.remove(queue.tail); - equal(queue.length, 0); - deepEqual(Array.from(queue), []); - }); - - it('unshift empty queue', () => { - queue.unshift(0); - equal(queue.length, 1); - deepEqual(Array.from(queue), [0]); - }); - - it('push 1', () => { - queue.push(1); - equal(queue.length, 2); - deepEqual(Array.from(queue), [0, 1]); - }); - - it('shift', () => { - equal(queue.shift(), 0); - equal(queue.length, 1); - deepEqual(Array.from(queue), [1]); - }); - - it('shift last element', () => { - equal(queue.shift(), 1); - equal(queue.length, 0); - deepEqual(Array.from(queue), []); - }); -}); diff --git a/packages/client/lib/client/queue.ts b/packages/client/lib/client/queue.ts deleted file mode 100644 index 5a213ce2bd..0000000000 --- a/packages/client/lib/client/queue.ts +++ /dev/null @@ -1,101 +0,0 @@ -export interface QueueNode { - value: T; - previous: QueueNode | null; - next: QueueNode | null; -} - -export default class Queue { - private _length = 0; - - get length() { - return this._length; - } - - private _head: QueueNode | null = null; - - get head() { - return this._head; - } - - _tail: QueueNode | null = null; - - get tail() { - return this._tail; - } - - push(value: T) { - ++this._length; - - if (!this._tail) { - return this._tail = this._head = { - previous: this._head, - next: null, - value - }; - } - - return this._tail = this._tail.next = { - previous: this._tail, - next: null, - value - }; - } - - unshift(value: T) { - ++this._length; - - if (!this._head) { - return this._head = this._tail = { - previous: null, - next: null, - value - }; - } - - return this._head = this._head.previous = { - previous: null, - next: this._head, - value - }; - } - - shift() { - if (!this._head) return null; - - --this._length; - const node = this._head; - if (node.next) { - node.next.previous = node.previous; - this._head = node.next; - node.next = null; - } else { - this._head = this._tail = null; - } - return node.value; - } - - remove(node: QueueNode) { - --this._length; - - if (this._tail === node) { - this._tail = node.previous; - } - - if (this._head === node) { - this._head = node.next; - } else { - node.previous!.next = node.next; - node.previous = null; - } - - node.next = null; - } - - *[Symbol.iterator]() { - let node = this._head; - while (node !== null) { - yield node.value; - node = node.next; - } - } -} diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index a2683cc85e..a7f3d77dd6 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -64,49 +64,49 @@ export default class RedisSocket extends EventEmitter { return (options as RedisTlsSocketOptions).tls === true; } - readonly #initiator: RedisSocketInitiator; + private readonly _initiator: RedisSocketInitiator; - readonly #options: RedisSocketOptions; + private readonly _options: RedisSocketOptions; - #socket?: net.Socket | tls.TLSSocket; + private _socket?: net.Socket | tls.TLSSocket; - #isOpen = false; + private _isOpen = false; get isOpen(): boolean { - return this.#isOpen; + return this._isOpen; } - #isReady = false; + private _isReady = false; get isReady(): boolean { - return this.#isReady; + return this._isReady; } // `writable.writableNeedDrain` was added in v15.2.0 and therefore can't be used // https://nodejs.org/api/stream.html#stream_writable_writableneeddrain - #writableNeedDrain = false; + private _writableNeedDrain = false; get writableNeedDrain(): boolean { - return this.#writableNeedDrain; + return this._writableNeedDrain; } - #isSocketUnrefed = false; + private _isSocketUnrefed = false; constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { super(); - this.#initiator = initiator; - this.#options = RedisSocket.#initiateOptions(options); + this._initiator = initiator; + this._options = RedisSocket.#initiateOptions(options); } - #reconnectStrategy(retries: number, cause: Error) { - if (this.#options.reconnectStrategy === false) { + private _reconnectStrategy(retries: number, cause: Error) { + if (this._options.reconnectStrategy === false) { return false; - } else if (typeof this.#options.reconnectStrategy === 'number') { - return this.#options.reconnectStrategy; - } else if (this.#options.reconnectStrategy) { + } else if (typeof this._options.reconnectStrategy === 'number') { + return this._options.reconnectStrategy; + } else if (this._options.reconnectStrategy) { try { - const retryIn = this.#options.reconnectStrategy(retries, cause); + const retryIn = this._options.reconnectStrategy(retries, cause); if (retryIn !== false && !(retryIn instanceof Error) && typeof retryIn !== 'number') { throw new TypeError(`Reconnect strategy should return \`false | Error | number\`, got ${retryIn} instead`); } @@ -120,14 +120,14 @@ export default class RedisSocket extends EventEmitter { return Math.min(retries * 50, 500); } - #shouldReconnect(retries: number, cause: Error) { - const retryIn = this.#reconnectStrategy(retries, cause); + private _shouldReconnect(retries: number, cause: Error) { + const retryIn = this._reconnectStrategy(retries, cause); if (retryIn === false) { - this.#isOpen = false; + this._isOpen = false; this.emit('error', cause); return cause; } else if (retryIn instanceof Error) { - this.#isOpen = false; + this._isOpen = false; this.emit('error', cause); return new ReconnectStrategyError(retryIn, cause); } @@ -136,33 +136,33 @@ export default class RedisSocket extends EventEmitter { } async connect(): Promise { - if (this.#isOpen) { + if (this._isOpen) { throw new Error('Socket already opened'); } - this.#isOpen = true; - return this.#connect(); + this._isOpen = true; + return this._connect(); } - async #connect(): Promise { + private async _connect(): Promise { let retries = 0; do { try { - this.#socket = await this.#createSocket(); - this.#writableNeedDrain = false; + this._socket = await this._createSocket(); + this._writableNeedDrain = false; this.emit('connect'); try { - await this.#initiator(); + await this._initiator(); } catch (err) { - this.#socket.destroy(); - this.#socket = undefined; + this._socket.destroy(); + this._socket = undefined; throw err; } - this.#isReady = true; + this._isReady = true; this.emit('ready'); } catch (err) { - const retryIn = this.#shouldReconnect(retries++, err as Error); + const retryIn = this._shouldReconnect(retries++, err as Error); if (typeof retryIn !== 'number') { throw retryIn; } @@ -171,40 +171,40 @@ export default class RedisSocket extends EventEmitter { await promiseTimeout(retryIn); this.emit('reconnecting'); } - } while (this.#isOpen && !this.#isReady); + } while (this._isOpen && !this._isReady); } - #createSocket(): Promise { + private _createSocket(): Promise { return new Promise((resolve, reject) => { - const { connectEvent, socket } = RedisSocket.#isTlsSocket(this.#options) ? - this.#createTlsSocket() : - this.#createNetSocket(); + const { connectEvent, socket } = RedisSocket.#isTlsSocket(this._options) ? + this._createTlsSocket() : + this._createNetSocket(); - if (this.#options.connectTimeout) { - socket.setTimeout(this.#options.connectTimeout, () => socket.destroy(new ConnectionTimeoutError())); + if (this._options.connectTimeout) { + socket.setTimeout(this._options.connectTimeout, () => socket.destroy(new ConnectionTimeoutError())); } - if (this.#isSocketUnrefed) { + if (this._isSocketUnrefed) { socket.unref(); } socket - .setNoDelay(this.#options.noDelay) + .setNoDelay(this._options.noDelay) .once('error', reject) .once(connectEvent, () => { socket .setTimeout(0) // https://github.com/nodejs/node/issues/31663 - .setKeepAlive(this.#options.keepAlive !== false, this.#options.keepAlive || 0) + .setKeepAlive(this._options.keepAlive !== false, this._options.keepAlive || 0) .off('error', reject) - .once('error', (err: Error) => this.#onSocketError(err)) + .once('error', (err: Error) => this._onSocketError(err)) .once('close', hadError => { - if (!hadError && this.#isOpen && this.#socket === socket) { - this.#onSocketError(new SocketClosedUnexpectedlyError()); + if (!hadError && this._isOpen && this._socket === socket) { + this._onSocketError(new SocketClosedUnexpectedlyError()); } }) .on('drain', () => { - this.#writableNeedDrain = false; + this._writableNeedDrain = false; this.emit('drain'); }) .on('data', data => this.emit('data', data)); @@ -214,104 +214,104 @@ export default class RedisSocket extends EventEmitter { }); } - #createNetSocket(): CreateSocketReturn { + private _createNetSocket(): CreateSocketReturn { return { connectEvent: 'connect', - socket: net.connect(this.#options as net.NetConnectOpts) // TODO + socket: net.connect(this._options as net.NetConnectOpts) // TODO }; } - #createTlsSocket(): CreateSocketReturn { + private _createTlsSocket(): CreateSocketReturn { return { connectEvent: 'secureConnect', - socket: tls.connect(this.#options as tls.ConnectionOptions) // TODO + socket: tls.connect(this._options as tls.ConnectionOptions) // TODO }; } - #onSocketError(err: Error): void { - this.#isReady = false; + private _onSocketError(err: Error): void { + this._isReady = false; this.emit('error', err); - if (!this.#isOpen || typeof this.#shouldReconnect(0, err) !== 'number') return; + if (!this._isOpen || typeof this._shouldReconnect(0, err) !== 'number') return; this.emit('reconnecting'); - this.#connect().catch(() => { + this._connect().catch(() => { // the error was already emitted, silently ignore it }); } writeCommand(args: Array): void { - if (!this.#socket) { + if (!this._socket) { throw new ClientClosedError(); } for (const toWrite of args) { - this.#writableNeedDrain = !this.#socket.write(toWrite); + this._writableNeedDrain = !this._socket.write(toWrite); } } async quit(fn: () => Promise): Promise { - if (!this.#isOpen) { + if (!this._isOpen) { throw new ClientClosedError(); } - this.#isOpen = false; + this._isOpen = false; const reply = await fn(); this.destroySocket(); return reply; } close() { - if (!this.#isOpen) { + if (!this._isOpen) { throw new ClientClosedError(); } - this.#isOpen = false; + this._isOpen = false; } destroy() { - if (!this.#isOpen) { + if (!this._isOpen) { throw new ClientClosedError(); } - this.#isOpen = false; + this._isOpen = false; this.destroySocket(); } destroySocket() { - this.#isReady = false; + this._isReady = false; - if (this.#socket) { - this.#socket.destroy(); - this.#socket = undefined; + if (this._socket) { + this._socket.destroy(); + this._socket = undefined; } this.emit('end'); } - #isCorked = false; + private _isCorked = false; cork(): void { - if (!this.#socket || this.#isCorked) { + if (!this._socket || this._isCorked) { return; } - this.#socket.cork(); - this.#isCorked = true; + this._socket.cork(); + this._isCorked = true; - queueMicrotask(() => { - this.#socket?.uncork(); - this.#isCorked = false; + setImmediate(() => { + this._socket?.uncork(); + this._isCorked = false; }); } ref(): void { - this.#isSocketUnrefed = false; - this.#socket?.ref(); + this._isSocketUnrefed = false; + this._socket?.ref(); } unref(): void { - this.#isSocketUnrefed = true; - this.#socket?.unref(); + this._isSocketUnrefed = true; + this._socket?.unref(); } } diff --git a/packages/client/lib/commands/CLUSTER_MYID.ts b/packages/client/lib/commands/CLUSTER_MYID.ts index a7a4184f9c..6c682ddccf 100644 --- a/packages/client/lib/commands/CLUSTER_MYID.ts +++ b/packages/client/lib/commands/CLUSTER_MYID.ts @@ -1,4 +1,4 @@ -import { BlobStringReply, Command } from "../RESP/types"; +import { BlobStringReply, Command } from '../RESP/types'; export default { transformArguments() {