1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00
This commit is contained in:
Leibale
2023-05-24 13:59:53 +03:00
parent 0e89c667c9
commit dc094eaa9a
11 changed files with 441 additions and 303 deletions

View File

@@ -555,6 +555,7 @@ export class Decoder {
this, this,
length - slice.length, length - slice.length,
chunks, chunks,
skip,
flag flag
); );
} }

View File

@@ -1,4 +1,4 @@
import { RedisArgument } from "./types"; import { RedisArgument } from './types';
const CRLF = '\r\n'; const CRLF = '\r\n';

View File

@@ -1,4 +1,4 @@
import Queue, { QueueNode } from './queue'; import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
import encodeCommand from '../RESP/encoder'; import encodeCommand from '../RESP/encoder';
import { Decoder, PUSH_FLAGS, RESP_TYPES } from '../RESP/decoder'; import { Decoder, PUSH_FLAGS, RESP_TYPES } from '../RESP/decoder';
import { CommandArguments, Flags, ReplyUnion, RespVersions } from '../RESP/types'; import { CommandArguments, Flags, ReplyUnion, RespVersions } from '../RESP/types';
@@ -7,16 +7,19 @@ import { AbortError, ErrorReply } from '../errors';
import { EventEmitter } from 'stream'; import { EventEmitter } from 'stream';
export interface QueueCommandOptions { export interface QueueCommandOptions {
asap?: boolean;
chainId?: symbol; chainId?: symbol;
signal?: AbortSignal; asap?: boolean;
abortSignal?: AbortSignal;
flags?: Flags; flags?: Flags;
} }
export interface CommandWaitingToBeSent extends CommandWaitingForReply { export interface CommandWaitingToBeSent extends CommandWaitingForReply {
args: CommandArguments; args: CommandArguments;
chainId?: symbol; chainId?: symbol;
removeAbortListener?(): void; abort?: {
signal: AbortSignal;
listener: () => unknown;
};
} }
interface CommandWaitingForReply { interface CommandWaitingForReply {
@@ -37,8 +40,8 @@ const RESP2_PUSH_FLAGS = {
export default class RedisCommandsQueue { export default class RedisCommandsQueue {
private readonly _maxLength: number | null | undefined; private readonly _maxLength: number | null | undefined;
private readonly _waitingToBeSent = new Queue<CommandWaitingToBeSent>(); private readonly _waitingToBeSent = new DoublyLinkedList<CommandWaitingToBeSent>();
private readonly _waitingForReply = new Queue<CommandWaitingForReply>(); private readonly _waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
private readonly _onShardedChannelMoved: OnShardedChannelMoved; private readonly _onShardedChannelMoved: OnShardedChannelMoved;
private readonly _pubSub = new PubSub(); private readonly _pubSub = new PubSub();
@@ -149,30 +152,31 @@ export default class RedisCommandsQueue {
addCommand<T>(args: CommandArguments, options?: QueueCommandOptions): Promise<T> { addCommand<T>(args: CommandArguments, options?: QueueCommandOptions): Promise<T> {
if (this._maxLength && this._waitingToBeSent.length + this._waitingForReply.length >= this._maxLength) { if (this._maxLength && this._waitingToBeSent.length + this._waitingForReply.length >= this._maxLength) {
return Promise.reject(new Error('The queue is full')); return Promise.reject(new Error('The queue is full'));
} else if (options?.signal?.aborted) { } else if (options?.abortSignal?.aborted) {
return Promise.reject(new AbortError()); return Promise.reject(new AbortError());
} }
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let node: QueueNode<CommandWaitingToBeSent>; let node: DoublyLinkedNode<CommandWaitingToBeSent>;
const value: CommandWaitingToBeSent = { const value: CommandWaitingToBeSent = {
args, args,
chainId: options?.chainId, chainId: options?.chainId,
flags: options?.flags, flags: options?.flags,
resolve, resolve,
reject, reject,
removeAbortListener: undefined abort: undefined
}; };
const signal = options?.signal; const signal = options?.abortSignal;
if (signal) { if (signal) {
const listener = () => { value.abort = {
this._waitingToBeSent.remove(node); signal,
value.reject(new AbortError()); listener: () => {
this._waitingToBeSent.remove(node);
value.reject(new AbortError());
}
}; };
signal.addEventListener('abort', value.abort.listener, { once: true });
value.removeAbortListener = () => signal.removeEventListener('abort', listener);
signal.addEventListener('abort', listener, { once: true });
} }
node = options?.asap ? node = options?.asap ?
@@ -264,13 +268,15 @@ export default class RedisCommandsQueue {
return; return;
} }
// TODO if (toSend.abort) {
// reuse `toSend` RedisCommandsQueue._removeAbortListener(toSend);
(toSend.args as any) = undefined; toSend.abort = undefined;
if (toSend.removeAbortListener) {
toSend.removeAbortListener();
(toSend.removeAbortListener as any) = undefined;
} }
// TODO reuse `toSend` or create new object?
(toSend as any).args = undefined;
(toSend as any).chainId = undefined;
this._waitingForReply.push(toSend); this._waitingForReply.push(toSend);
this._chainInExecution = toSend.chainId; this._chainInExecution = toSend.chainId;
return encoded; return encoded;
@@ -282,9 +288,16 @@ export default class RedisCommandsQueue {
} }
} }
static #flushWaitingToBeSent(command: CommandWaitingToBeSent, err: Error) { private static _removeAbortListener(command: CommandWaitingToBeSent) {
command.removeAbortListener?.(); command.abort!.signal.removeEventListener('abort', command.abort!.listener);
command.reject(err); }
private static _flushWaitingToBeSent(toBeSent: CommandWaitingToBeSent, err: Error) {
if (toBeSent.abort) {
RedisCommandsQueue._removeAbortListener(toBeSent);
}
toBeSent.reject(err);
} }
flushWaitingForReply(err: Error): void { flushWaitingForReply(err: Error): void {
@@ -296,7 +309,7 @@ export default class RedisCommandsQueue {
if (!this._chainInExecution) return; if (!this._chainInExecution) return;
while (this._waitingToBeSent.head?.value.chainId === this._chainInExecution) { while (this._waitingToBeSent.head?.value.chainId === this._chainInExecution) {
RedisCommandsQueue.#flushWaitingToBeSent( RedisCommandsQueue._flushWaitingToBeSent(
this._waitingToBeSent.shift()!, this._waitingToBeSent.shift()!,
err err
); );
@@ -310,7 +323,7 @@ export default class RedisCommandsQueue {
this._pubSub.reset(); this._pubSub.reset();
this.#flushWaitingForReply(err); this.#flushWaitingForReply(err);
while (this._waitingToBeSent.head) { while (this._waitingToBeSent.head) {
RedisCommandsQueue.#flushWaitingToBeSent( RedisCommandsQueue._flushWaitingToBeSent(
this._waitingToBeSent.shift()!, this._waitingToBeSent.shift()!,
err err
); );

View File

@@ -137,22 +137,22 @@ export default class RedisClient<
> extends EventEmitter { > extends EventEmitter {
private static _createCommand(command: Command, resp: RespVersions) { private static _createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return async function (this: ProxyClient) { return async function (this: ProxyClient, ...args: Array<unknown>) {
const args = command.transformArguments.apply(undefined, arguments as any), const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(args, this.commandOptions); reply = await this.sendCommand(redisArgs, this.commandOptions);
return transformReply ? return transformReply ?
transformReply(reply, args.preserve) : transformReply(reply, redisArgs.preserve) :
reply; reply;
}; };
} }
private static _createModuleCommand(command: Command, resp: RespVersions) { private static _createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return async function (this: NamespaceProxyClient) { return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
const args = command.transformArguments.apply(undefined, arguments as any), const redisArgs = command.transformArguments(...args),
reply = await this.self.sendCommand(args, this.self.commandOptions); reply = await this.self.sendCommand(redisArgs, this.self.commandOptions);
return transformReply ? return transformReply ?
transformReply(reply, args.preserve) : transformReply(reply, redisArgs.preserve) :
reply; reply;
}; };
} }
@@ -160,8 +160,8 @@ export default class RedisClient<
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) { private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn), const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp); transformReply = getTransformReply(fn, resp);
return async function (this: NamespaceProxyClient) { return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments.apply(undefined, arguments as any), const fnArgs = fn.transformArguments(...args),
reply = await this.self.sendCommand( reply = await this.self.sendCommand(
prefix.concat(fnArgs), prefix.concat(fnArgs),
this.self.commandOptions this.self.commandOptions
@@ -175,15 +175,15 @@ export default class RedisClient<
private static _createScriptCommand(script: RedisScript, resp: RespVersions) { private static _createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script), const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp); transformReply = getTransformReply(script, resp);
return async function (this: ProxyClient) { return async function (this: ProxyClient, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments.apply(undefined, arguments as any), const scriptArgs = script.transformArguments(...args),
args = prefix.concat(scriptArgs), redisArgs = prefix.concat(scriptArgs),
reply = await this.sendCommand(args, this.commandOptions).catch((err: unknown) => { reply = await this.sendCommand(redisArgs, this.commandOptions).catch((err: unknown) => {
if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err; if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err;
args[0] = 'EVAL'; args[0] = 'EVAL';
args[1] = script.SCRIPT; args[1] = script.SCRIPT;
return this.sendCommand(args, this.commandOptions); return this.sendCommand(redisArgs, this.commandOptions);
}); });
return transformReply ? return transformReply ?
transformReply(reply, scriptArgs.preserve) : transformReply(reply, scriptArgs.preserve) :
@@ -470,6 +470,10 @@ export default class RedisClient<
return this._commandOptionsProxy('flags', flags); return this._commandOptionsProxy('flags', flags);
} }
withAbortSignal(abortSignal: AbortSignal) {
return this._commandOptionsProxy('abortSignal', abortSignal);
}
/** /**
* Override the `asap` command option to `true` * Override the `asap` command option to `true`
*/ */
@@ -792,8 +796,6 @@ export default class RedisClient<
return Promise.resolve(this.destroy()); return Promise.resolve(this.destroy());
} }
private _resolveClose?: () => unknown;
/** /**
* Close the client. Wait for pending replies. * Close the client. Wait for pending replies.
*/ */

View File

@@ -0,0 +1,138 @@
import { SinglyLinkedList, DoublyLinkedList } from './linked-list';
import { equal, deepEqual } from 'assert/strict';
describe.only('DoublyLinkedList', () => {
const list = new DoublyLinkedList();
it('should start empty', () => {
equal(list.length, 0);
equal(list.head, undefined);
equal(list.tail, undefined);
deepEqual(Array.from(list), []);
});
it('shift empty', () => {
equal(list.shift(), undefined);
equal(list.length, 0);
deepEqual(Array.from(list), []);
});
it('push 1', () => {
list.push(1);
equal(list.length, 1);
deepEqual(Array.from(list), [1]);
});
it('push 2', () => {
list.push(2);
equal(list.length, 2);
deepEqual(Array.from(list), [1, 2]);
});
it('unshift 0', () => {
list.unshift(0);
equal(list.length, 3);
deepEqual(Array.from(list), [0, 1, 2]);
});
it('remove middle node', () => {
list.remove(list.head!.next!);
equal(list.length, 2);
deepEqual(Array.from(list), [0, 2]);
});
it('remove head', () => {
list.remove(list.head!);
equal(list.length, 1);
deepEqual(Array.from(list), [2]);
});
it('remove tail', () => {
list.remove(list.tail!);
equal(list.length, 0);
deepEqual(Array.from(list), []);
});
it('unshift empty queue', () => {
list.unshift(0);
equal(list.length, 1);
deepEqual(Array.from(list), [0]);
});
it('push 1', () => {
list.push(1);
equal(list.length, 2);
deepEqual(Array.from(list), [0, 1]);
});
it('shift', () => {
equal(list.shift(), 0);
equal(list.length, 1);
deepEqual(Array.from(list), [1]);
});
it('shift last element', () => {
equal(list.shift(), 1);
equal(list.length, 0);
deepEqual(Array.from(list), []);
});
});
describe.only('SinglyLinkedList', () => {
const list = new SinglyLinkedList();
it('should start empty', () => {
equal(list.length, 0);
equal(list.head, undefined);
equal(list.tail, undefined);
deepEqual(Array.from(list), []);
});
it('shift empty', () => {
equal(list.shift(), undefined);
equal(list.length, 0);
deepEqual(Array.from(list), []);
});
it('push 1', () => {
list.push(1);
equal(list.length, 1);
deepEqual(Array.from(list), [1]);
});
it('push 2', () => {
list.push(2);
equal(list.length, 2);
deepEqual(Array.from(list), [1, 2]);
});
it('push 3', () => {
list.push(3);
equal(list.length, 3);
deepEqual(Array.from(list), [1, 2, 3]);
});
it('shift 1', () => {
equal(list.shift(), 1);
equal(list.length, 2);
deepEqual(Array.from(list), [2, 3]);
});
it('shift 2', () => {
equal(list.shift(), 2);
equal(list.length, 1);
deepEqual(Array.from(list), [3]);
});
it('shift 3', () => {
equal(list.shift(), 3);
equal(list.length, 0);
deepEqual(Array.from(list), []);
});
it('should be empty', () => {
equal(list.length, 0);
equal(list.head, undefined);
equal(list.tail, undefined);
});
});

View File

@@ -0,0 +1,162 @@
export interface DoublyLinkedNode<T> {
value: T;
previous: DoublyLinkedNode<T> | undefined;
next: DoublyLinkedNode<T> | undefined;
}
export class DoublyLinkedList<T> {
private _length = 0;
get length() {
return this._length;
}
private _head?: DoublyLinkedNode<T>;
get head() {
return this._head;
}
private _tail?: DoublyLinkedNode<T>;
get tail() {
return this._tail;
}
push(value: T) {
++this._length;
if (this._tail === undefined) {
return this._tail = this._head = {
previous: this._head,
next: undefined,
value
};
}
return this._tail = this._tail.next = {
previous: this._tail,
next: undefined,
value
};
}
unshift(value: T) {
++this._length;
if (this._head === undefined) {
return this._head = this._tail = {
previous: undefined,
next: undefined,
value
};
}
return this._head = this._head.previous = {
previous: undefined,
next: this._head,
value
};
}
shift() {
if (this._head === undefined) return undefined;
--this._length;
const node = this._head;
if (node.next) {
node.next.previous = node.previous;
this._head = node.next;
node.next = undefined;
} else {
this._head = this._tail = undefined;
}
return node.value;
}
remove(node: DoublyLinkedNode<T>) {
--this._length;
if (this._tail === node) {
this._tail = node.previous;
}
if (this._head === node) {
this._head = node.next;
} else {
node.previous!.next = node.next;
node.previous = undefined;
}
node.next = undefined;
}
*[Symbol.iterator]() {
let node = this._head;
while (node !== undefined) {
yield node.value;
node = node.next;
}
}
}
export interface SinglyLinkedNode<T> {
value: T;
next: SinglyLinkedNode<T> | undefined;
}
export class SinglyLinkedList<T> {
private _length = 0;
get length() {
return this._length;
}
private _head?: SinglyLinkedNode<T>;
get head() {
return this._head;
}
private _tail?: SinglyLinkedNode<T>;
get tail() {
return this._tail;
}
push(value: T) {
++this._length;
const node = {
value,
next: undefined
};
if (this._head === undefined) {
this._head = this._tail = node;
} else {
this._tail!.next = this._tail = node;
}
}
shift() {
if (this._head === undefined) return undefined;
const node = this._head;
if (--this._length === 0) {
this._head = this._tail = undefined;
} else {
this._head = node.next;
}
return node.value;
}
*[Symbol.iterator]() {
let node = this._head;
while (node !== undefined) {
yield node.value;
node = node.next;
}
}
}

View File

@@ -1,4 +1,4 @@
import { RedisArgument } from "../RESP/types"; import { RedisArgument } from '../RESP/types';
export enum PubSubType { export enum PubSubType {
CHANNELS = 'CHANNELS', CHANNELS = 'CHANNELS',

View File

@@ -1,77 +0,0 @@
import Queue from './queue';
import { equal, deepEqual } from 'assert/strict';
describe('Queue', () => {
const queue = new Queue();
it('should start empty', () => {
equal(queue.length, 0);
deepEqual(Array.from(queue), []);
});
it('shift empty', () => {
equal(queue.shift(), null);
equal(queue.length, 0);
deepEqual(Array.from(queue), []);
});
it('push 1', () => {
queue.push(1);
equal(queue.length, 1);
deepEqual(Array.from(queue), [1]);
});
it('push 2', () => {
queue.push(2);
equal(queue.length, 2);
deepEqual(Array.from(queue), [1, 2]);
});
it('unshift 0', () => {
queue.unshift(0);
equal(queue.length, 3);
deepEqual(Array.from(queue), [0, 1, 2]);
});
it('remove middle node', () => {
queue.remove(queue.head.next!);
equal(queue.length, 2);
deepEqual(Array.from(queue), [0, 2]);
});
it('remove head', () => {
queue.remove(queue.head);
equal(queue.length, 1);
deepEqual(Array.from(queue), [2]);
});
it('remove tail', () => {
queue.remove(queue.tail);
equal(queue.length, 0);
deepEqual(Array.from(queue), []);
});
it('unshift empty queue', () => {
queue.unshift(0);
equal(queue.length, 1);
deepEqual(Array.from(queue), [0]);
});
it('push 1', () => {
queue.push(1);
equal(queue.length, 2);
deepEqual(Array.from(queue), [0, 1]);
});
it('shift', () => {
equal(queue.shift(), 0);
equal(queue.length, 1);
deepEqual(Array.from(queue), [1]);
});
it('shift last element', () => {
equal(queue.shift(), 1);
equal(queue.length, 0);
deepEqual(Array.from(queue), []);
});
});

View File

@@ -1,101 +0,0 @@
export interface QueueNode<T> {
value: T;
previous: QueueNode<T> | null;
next: QueueNode<T> | null;
}
export default class Queue<T> {
private _length = 0;
get length() {
return this._length;
}
private _head: QueueNode<T> | null = null;
get head() {
return this._head;
}
_tail: QueueNode<T> | null = null;
get tail() {
return this._tail;
}
push(value: T) {
++this._length;
if (!this._tail) {
return this._tail = this._head = {
previous: this._head,
next: null,
value
};
}
return this._tail = this._tail.next = {
previous: this._tail,
next: null,
value
};
}
unshift(value: T) {
++this._length;
if (!this._head) {
return this._head = this._tail = {
previous: null,
next: null,
value
};
}
return this._head = this._head.previous = {
previous: null,
next: this._head,
value
};
}
shift() {
if (!this._head) return null;
--this._length;
const node = this._head;
if (node.next) {
node.next.previous = node.previous;
this._head = node.next;
node.next = null;
} else {
this._head = this._tail = null;
}
return node.value;
}
remove(node: QueueNode<T>) {
--this._length;
if (this._tail === node) {
this._tail = node.previous;
}
if (this._head === node) {
this._head = node.next;
} else {
node.previous!.next = node.next;
node.previous = null;
}
node.next = null;
}
*[Symbol.iterator]() {
let node = this._head;
while (node !== null) {
yield node.value;
node = node.next;
}
}
}

View File

@@ -64,49 +64,49 @@ export default class RedisSocket extends EventEmitter {
return (options as RedisTlsSocketOptions).tls === true; return (options as RedisTlsSocketOptions).tls === true;
} }
readonly #initiator: RedisSocketInitiator; private readonly _initiator: RedisSocketInitiator;
readonly #options: RedisSocketOptions; private readonly _options: RedisSocketOptions;
#socket?: net.Socket | tls.TLSSocket; private _socket?: net.Socket | tls.TLSSocket;
#isOpen = false; private _isOpen = false;
get isOpen(): boolean { get isOpen(): boolean {
return this.#isOpen; return this._isOpen;
} }
#isReady = false; private _isReady = false;
get isReady(): boolean { get isReady(): boolean {
return this.#isReady; return this._isReady;
} }
// `writable.writableNeedDrain` was added in v15.2.0 and therefore can't be used // `writable.writableNeedDrain` was added in v15.2.0 and therefore can't be used
// https://nodejs.org/api/stream.html#stream_writable_writableneeddrain // https://nodejs.org/api/stream.html#stream_writable_writableneeddrain
#writableNeedDrain = false; private _writableNeedDrain = false;
get writableNeedDrain(): boolean { get writableNeedDrain(): boolean {
return this.#writableNeedDrain; return this._writableNeedDrain;
} }
#isSocketUnrefed = false; private _isSocketUnrefed = false;
constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) {
super(); super();
this.#initiator = initiator; this._initiator = initiator;
this.#options = RedisSocket.#initiateOptions(options); this._options = RedisSocket.#initiateOptions(options);
} }
#reconnectStrategy(retries: number, cause: Error) { private _reconnectStrategy(retries: number, cause: Error) {
if (this.#options.reconnectStrategy === false) { if (this._options.reconnectStrategy === false) {
return false; return false;
} else if (typeof this.#options.reconnectStrategy === 'number') { } else if (typeof this._options.reconnectStrategy === 'number') {
return this.#options.reconnectStrategy; return this._options.reconnectStrategy;
} else if (this.#options.reconnectStrategy) { } else if (this._options.reconnectStrategy) {
try { try {
const retryIn = this.#options.reconnectStrategy(retries, cause); const retryIn = this._options.reconnectStrategy(retries, cause);
if (retryIn !== false && !(retryIn instanceof Error) && typeof retryIn !== 'number') { if (retryIn !== false && !(retryIn instanceof Error) && typeof retryIn !== 'number') {
throw new TypeError(`Reconnect strategy should return \`false | Error | number\`, got ${retryIn} instead`); throw new TypeError(`Reconnect strategy should return \`false | Error | number\`, got ${retryIn} instead`);
} }
@@ -120,14 +120,14 @@ export default class RedisSocket extends EventEmitter {
return Math.min(retries * 50, 500); return Math.min(retries * 50, 500);
} }
#shouldReconnect(retries: number, cause: Error) { private _shouldReconnect(retries: number, cause: Error) {
const retryIn = this.#reconnectStrategy(retries, cause); const retryIn = this._reconnectStrategy(retries, cause);
if (retryIn === false) { if (retryIn === false) {
this.#isOpen = false; this._isOpen = false;
this.emit('error', cause); this.emit('error', cause);
return cause; return cause;
} else if (retryIn instanceof Error) { } else if (retryIn instanceof Error) {
this.#isOpen = false; this._isOpen = false;
this.emit('error', cause); this.emit('error', cause);
return new ReconnectStrategyError(retryIn, cause); return new ReconnectStrategyError(retryIn, cause);
} }
@@ -136,33 +136,33 @@ export default class RedisSocket extends EventEmitter {
} }
async connect(): Promise<void> { async connect(): Promise<void> {
if (this.#isOpen) { if (this._isOpen) {
throw new Error('Socket already opened'); throw new Error('Socket already opened');
} }
this.#isOpen = true; this._isOpen = true;
return this.#connect(); return this._connect();
} }
async #connect(): Promise<void> { private async _connect(): Promise<void> {
let retries = 0; let retries = 0;
do { do {
try { try {
this.#socket = await this.#createSocket(); this._socket = await this._createSocket();
this.#writableNeedDrain = false; this._writableNeedDrain = false;
this.emit('connect'); this.emit('connect');
try { try {
await this.#initiator(); await this._initiator();
} catch (err) { } catch (err) {
this.#socket.destroy(); this._socket.destroy();
this.#socket = undefined; this._socket = undefined;
throw err; throw err;
} }
this.#isReady = true; this._isReady = true;
this.emit('ready'); this.emit('ready');
} catch (err) { } catch (err) {
const retryIn = this.#shouldReconnect(retries++, err as Error); const retryIn = this._shouldReconnect(retries++, err as Error);
if (typeof retryIn !== 'number') { if (typeof retryIn !== 'number') {
throw retryIn; throw retryIn;
} }
@@ -171,40 +171,40 @@ export default class RedisSocket extends EventEmitter {
await promiseTimeout(retryIn); await promiseTimeout(retryIn);
this.emit('reconnecting'); this.emit('reconnecting');
} }
} while (this.#isOpen && !this.#isReady); } while (this._isOpen && !this._isReady);
} }
#createSocket(): Promise<net.Socket | tls.TLSSocket> { private _createSocket(): Promise<net.Socket | tls.TLSSocket> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const { connectEvent, socket } = RedisSocket.#isTlsSocket(this.#options) ? const { connectEvent, socket } = RedisSocket.#isTlsSocket(this._options) ?
this.#createTlsSocket() : this._createTlsSocket() :
this.#createNetSocket(); this._createNetSocket();
if (this.#options.connectTimeout) { if (this._options.connectTimeout) {
socket.setTimeout(this.#options.connectTimeout, () => socket.destroy(new ConnectionTimeoutError())); socket.setTimeout(this._options.connectTimeout, () => socket.destroy(new ConnectionTimeoutError()));
} }
if (this.#isSocketUnrefed) { if (this._isSocketUnrefed) {
socket.unref(); socket.unref();
} }
socket socket
.setNoDelay(this.#options.noDelay) .setNoDelay(this._options.noDelay)
.once('error', reject) .once('error', reject)
.once(connectEvent, () => { .once(connectEvent, () => {
socket socket
.setTimeout(0) .setTimeout(0)
// https://github.com/nodejs/node/issues/31663 // https://github.com/nodejs/node/issues/31663
.setKeepAlive(this.#options.keepAlive !== false, this.#options.keepAlive || 0) .setKeepAlive(this._options.keepAlive !== false, this._options.keepAlive || 0)
.off('error', reject) .off('error', reject)
.once('error', (err: Error) => this.#onSocketError(err)) .once('error', (err: Error) => this._onSocketError(err))
.once('close', hadError => { .once('close', hadError => {
if (!hadError && this.#isOpen && this.#socket === socket) { if (!hadError && this._isOpen && this._socket === socket) {
this.#onSocketError(new SocketClosedUnexpectedlyError()); this._onSocketError(new SocketClosedUnexpectedlyError());
} }
}) })
.on('drain', () => { .on('drain', () => {
this.#writableNeedDrain = false; this._writableNeedDrain = false;
this.emit('drain'); this.emit('drain');
}) })
.on('data', data => this.emit('data', data)); .on('data', data => this.emit('data', data));
@@ -214,104 +214,104 @@ export default class RedisSocket extends EventEmitter {
}); });
} }
#createNetSocket(): CreateSocketReturn<net.Socket> { private _createNetSocket(): CreateSocketReturn<net.Socket> {
return { return {
connectEvent: 'connect', connectEvent: 'connect',
socket: net.connect(this.#options as net.NetConnectOpts) // TODO socket: net.connect(this._options as net.NetConnectOpts) // TODO
}; };
} }
#createTlsSocket(): CreateSocketReturn<tls.TLSSocket> { private _createTlsSocket(): CreateSocketReturn<tls.TLSSocket> {
return { return {
connectEvent: 'secureConnect', connectEvent: 'secureConnect',
socket: tls.connect(this.#options as tls.ConnectionOptions) // TODO socket: tls.connect(this._options as tls.ConnectionOptions) // TODO
}; };
} }
#onSocketError(err: Error): void { private _onSocketError(err: Error): void {
this.#isReady = false; this._isReady = false;
this.emit('error', err); this.emit('error', err);
if (!this.#isOpen || typeof this.#shouldReconnect(0, err) !== 'number') return; if (!this._isOpen || typeof this._shouldReconnect(0, err) !== 'number') return;
this.emit('reconnecting'); this.emit('reconnecting');
this.#connect().catch(() => { this._connect().catch(() => {
// the error was already emitted, silently ignore it // the error was already emitted, silently ignore it
}); });
} }
writeCommand(args: Array<RedisArgument>): void { writeCommand(args: Array<RedisArgument>): void {
if (!this.#socket) { if (!this._socket) {
throw new ClientClosedError(); throw new ClientClosedError();
} }
for (const toWrite of args) { for (const toWrite of args) {
this.#writableNeedDrain = !this.#socket.write(toWrite); this._writableNeedDrain = !this._socket.write(toWrite);
} }
} }
async quit<T>(fn: () => Promise<T>): Promise<T> { async quit<T>(fn: () => Promise<T>): Promise<T> {
if (!this.#isOpen) { if (!this._isOpen) {
throw new ClientClosedError(); throw new ClientClosedError();
} }
this.#isOpen = false; this._isOpen = false;
const reply = await fn(); const reply = await fn();
this.destroySocket(); this.destroySocket();
return reply; return reply;
} }
close() { close() {
if (!this.#isOpen) { if (!this._isOpen) {
throw new ClientClosedError(); throw new ClientClosedError();
} }
this.#isOpen = false; this._isOpen = false;
} }
destroy() { destroy() {
if (!this.#isOpen) { if (!this._isOpen) {
throw new ClientClosedError(); throw new ClientClosedError();
} }
this.#isOpen = false; this._isOpen = false;
this.destroySocket(); this.destroySocket();
} }
destroySocket() { destroySocket() {
this.#isReady = false; this._isReady = false;
if (this.#socket) { if (this._socket) {
this.#socket.destroy(); this._socket.destroy();
this.#socket = undefined; this._socket = undefined;
} }
this.emit('end'); this.emit('end');
} }
#isCorked = false; private _isCorked = false;
cork(): void { cork(): void {
if (!this.#socket || this.#isCorked) { if (!this._socket || this._isCorked) {
return; return;
} }
this.#socket.cork(); this._socket.cork();
this.#isCorked = true; this._isCorked = true;
queueMicrotask(() => { setImmediate(() => {
this.#socket?.uncork(); this._socket?.uncork();
this.#isCorked = false; this._isCorked = false;
}); });
} }
ref(): void { ref(): void {
this.#isSocketUnrefed = false; this._isSocketUnrefed = false;
this.#socket?.ref(); this._socket?.ref();
} }
unref(): void { unref(): void {
this.#isSocketUnrefed = true; this._isSocketUnrefed = true;
this.#socket?.unref(); this._socket?.unref();
} }
} }

View File

@@ -1,4 +1,4 @@
import { BlobStringReply, Command } from "../RESP/types"; import { BlobStringReply, Command } from '../RESP/types';
export default { export default {
transformArguments() { transformArguments() {