You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-07 13:22:56 +03:00
fix #2679 - fix socket types, and clean some code
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import COMMANDS from '../commands';
|
||||
import RedisSocket, { RedisSocketOptions, RedisTlsSocketOptions } from './socket';
|
||||
import RedisSocket, { RedisSocketOptions } from './socket';
|
||||
import RedisCommandsQueue, { CommandOptions } from './commands-queue';
|
||||
import { EventEmitter } from 'node:events';
|
||||
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
|
||||
@@ -244,7 +244,7 @@ export default class RedisClient<
|
||||
};
|
||||
|
||||
if (protocol === 'rediss:') {
|
||||
(parsed.socket as RedisTlsSocketOptions).tls = true;
|
||||
parsed!.socket!.tls = true;
|
||||
} else if (protocol !== 'redis:') {
|
||||
throw new TypeError('Invalid protocol');
|
||||
}
|
||||
|
@@ -1,84 +1,65 @@
|
||||
import { EventEmitter } from 'node:events';
|
||||
import * as net from 'node:net';
|
||||
import * as tls from 'node:tls';
|
||||
import { EventEmitter, once } from 'node:events';
|
||||
import net from 'node:net';
|
||||
import tls from 'node:tls';
|
||||
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError } from '../errors';
|
||||
import { setTimeout } from 'node:timers/promises';
|
||||
import { RedisArgument } from '../RESP/types';
|
||||
|
||||
export interface RedisSocketCommonOptions {
|
||||
type NetOptions = {
|
||||
tls?: false;
|
||||
};
|
||||
|
||||
type TcpOptions = NetOptions & Omit<
|
||||
net.TcpNetConnectOpts,
|
||||
'timeout' | 'onread' | 'readable' | 'writable' | 'port'
|
||||
> & {
|
||||
port?: number;
|
||||
};
|
||||
|
||||
type IpcOptions = NetOptions & Omit<
|
||||
net.IpcNetConnectOpts,
|
||||
'timeout' | 'onread' | 'readable' | 'writable'
|
||||
>;
|
||||
|
||||
type TlsOptions = {
|
||||
tls: true;
|
||||
} & tls.ConnectionOptions;
|
||||
|
||||
type ReconnectStrategyFunction = (retries: number, cause: Error) => false | Error | number;
|
||||
|
||||
export type RedisSocketOptions = {
|
||||
/**
|
||||
* Connection Timeout (in milliseconds)
|
||||
* Connection timeout (in milliseconds)
|
||||
*/
|
||||
connectTimeout?: number;
|
||||
/**
|
||||
* Toggle [`Nagle's algorithm`](https://nodejs.org/api/net.html#net_socket_setnodelay_nodelay)
|
||||
*/
|
||||
noDelay?: boolean;
|
||||
/**
|
||||
* Toggle [`keep-alive`](https://nodejs.org/api/net.html#net_socket_setkeepalive_enable_initialdelay)
|
||||
*/
|
||||
keepAlive?: number | false;
|
||||
/**
|
||||
* When the socket closes unexpectedly (without calling `.close()`/`.destroy()`), the client uses `reconnectStrategy` to decide what to do. The following values are supported:
|
||||
* 1. `false` -> do not reconnect, close the client and flush the command queue.
|
||||
* 2. `number` -> wait for `X` milliseconds before reconnecting.
|
||||
* 3. `(retries: number, cause: Error) => false | number | Error` -> `number` is the same as configuring a `number` directly, `Error` is the same as `false`, but with a custom error.
|
||||
* Defaults to `retries => Math.min(retries * 50, 500)`
|
||||
*/
|
||||
reconnectStrategy?: false | number | ((retries: number, cause: Error) => false | Error | number);
|
||||
}
|
||||
|
||||
export interface RedisNetConnectOpts extends Omit<Partial<net.TcpNetConnectOpts>, 'keepAlive'>, Partial<net.IpcNetConnectOpts>, RedisSocketCommonOptions {
|
||||
tls?: false;
|
||||
};
|
||||
|
||||
export interface RedisTlsSocketOptions extends Partial<tls.ConnectionOptions>, RedisSocketCommonOptions {
|
||||
tls: true;
|
||||
};
|
||||
|
||||
export type RedisSocketOptions = RedisNetConnectOpts | RedisTlsSocketOptions
|
||||
|
||||
interface CreateSocketReturn<T> {
|
||||
connectEvent: string;
|
||||
socket: T;
|
||||
}
|
||||
reconnectStrategy?: false | number | ReconnectStrategyFunction;
|
||||
} & (TcpOptions | IpcOptions | TlsOptions);
|
||||
|
||||
export type RedisSocketInitiator = () => void | Promise<unknown>;
|
||||
|
||||
export default class RedisSocket extends EventEmitter {
|
||||
static #initiateOptions(options?: RedisSocketOptions): RedisSocketOptions {
|
||||
options ??= {};
|
||||
if (!(options as net.IpcSocketConnectOpts).path) {
|
||||
(options as net.TcpSocketConnectOpts).port ??= 6379;
|
||||
(options as net.TcpSocketConnectOpts).host ??= 'localhost';
|
||||
}
|
||||
|
||||
options.connectTimeout ??= 5000;
|
||||
options.keepAlive ??= 5000;
|
||||
options.noDelay ??= true;
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
static #isTlsSocket(options: RedisSocketOptions): options is RedisTlsSocketOptions {
|
||||
return (options as RedisTlsSocketOptions).tls === true;
|
||||
}
|
||||
|
||||
readonly #initiator: RedisSocketInitiator;
|
||||
|
||||
readonly #options: RedisSocketOptions;
|
||||
readonly #initiator;
|
||||
readonly #connectTimeout;
|
||||
readonly #reconnectStrategy;
|
||||
readonly #socketFactory;
|
||||
|
||||
#socket?: net.Socket | tls.TLSSocket;
|
||||
|
||||
#isOpen = false;
|
||||
|
||||
get isOpen(): boolean {
|
||||
get isOpen() {
|
||||
return this.#isOpen;
|
||||
}
|
||||
|
||||
#isReady = false;
|
||||
|
||||
get isReady(): boolean {
|
||||
get isReady() {
|
||||
return this.#isReady;
|
||||
}
|
||||
|
||||
@@ -88,28 +69,101 @@ export default class RedisSocket extends EventEmitter {
|
||||
super();
|
||||
|
||||
this.#initiator = initiator;
|
||||
this.#options = RedisSocket.#initiateOptions(options);
|
||||
this.#connectTimeout = options?.connectTimeout ?? 5000;
|
||||
this.#reconnectStrategy = this.#createReconnectStrategy(options);
|
||||
this.#socketFactory = this.#createSocketFactory(options);
|
||||
}
|
||||
|
||||
#reconnectStrategy(retries: number, cause: Error) {
|
||||
if (this.#options.reconnectStrategy === false) {
|
||||
return false;
|
||||
} else if (typeof this.#options.reconnectStrategy === 'number') {
|
||||
return this.#options.reconnectStrategy;
|
||||
} else if (this.#options.reconnectStrategy) {
|
||||
try {
|
||||
const retryIn = this.#options.reconnectStrategy(retries, cause);
|
||||
if (retryIn !== false && !(retryIn instanceof Error) && typeof retryIn !== 'number') {
|
||||
throw new TypeError(`Reconnect strategy should return \`false | Error | number\`, got ${retryIn} instead`);
|
||||
}
|
||||
|
||||
return retryIn;
|
||||
} catch (err) {
|
||||
this.emit('error', err);
|
||||
}
|
||||
#createReconnectStrategy(options?: RedisSocketOptions): ReconnectStrategyFunction {
|
||||
const strategy = options?.reconnectStrategy;
|
||||
if (strategy === false || typeof strategy === 'number') {
|
||||
return () => strategy;
|
||||
}
|
||||
|
||||
return Math.min(retries * 50, 500);
|
||||
if (strategy) {
|
||||
return (retries, cause) => {
|
||||
try {
|
||||
const retryIn = strategy(retries, cause);
|
||||
if (retryIn !== false && !(retryIn instanceof Error) && typeof retryIn !== 'number') {
|
||||
throw new TypeError(`Reconnect strategy should return \`false | Error | number\`, got ${retryIn} instead`);
|
||||
}
|
||||
return retryIn;
|
||||
} catch (err) {
|
||||
this.emit('error', err);
|
||||
return Math.min(retries * 50, 500);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return retries => Math.min(retries * 50, 500);
|
||||
}
|
||||
|
||||
#createSocketFactory(options?: RedisSocketOptions) {
|
||||
// TLS
|
||||
if (options?.tls === true) {
|
||||
const withDefaults: tls.ConnectionOptions = {
|
||||
...options,
|
||||
port: options?.port ?? 6379,
|
||||
// https://nodejs.org/api/tls.html#tlsconnectoptions-callback "Any socket.connect() option not already listed"
|
||||
// @types/node is... incorrect...
|
||||
// @ts-expect-error
|
||||
noDelay: options?.noDelay ?? true,
|
||||
// https://nodejs.org/api/tls.html#tlsconnectoptions-callback "Any socket.connect() option not already listed"
|
||||
// @types/node is... incorrect...
|
||||
// @ts-expect-error
|
||||
keepAlive: options?.keepAlive ?? true,
|
||||
// https://nodejs.org/api/tls.html#tlsconnectoptions-callback "Any socket.connect() option not already listed"
|
||||
// @types/node is... incorrect...
|
||||
// @ts-expect-error
|
||||
keepAliveInitialDelay: options?.keepAliveInitialDelay ?? 5000,
|
||||
timeout: undefined,
|
||||
onread: undefined,
|
||||
readable: true,
|
||||
writable: true
|
||||
};
|
||||
return {
|
||||
create() {
|
||||
return tls.connect(withDefaults);
|
||||
},
|
||||
event: 'secureConnect'
|
||||
};
|
||||
}
|
||||
|
||||
// IPC
|
||||
if (options && 'path' in options) {
|
||||
const withDefaults: net.IpcNetConnectOpts = {
|
||||
...options,
|
||||
timeout: undefined,
|
||||
onread: undefined,
|
||||
readable: true,
|
||||
writable: true
|
||||
};
|
||||
return {
|
||||
create() {
|
||||
return net.createConnection(withDefaults);
|
||||
},
|
||||
event: 'connect'
|
||||
};
|
||||
}
|
||||
|
||||
// TCP
|
||||
const withDefaults: net.TcpNetConnectOpts = {
|
||||
...options,
|
||||
port: options?.port ?? 6379,
|
||||
noDelay: options?.noDelay ?? true,
|
||||
keepAlive: options?.keepAlive ?? true,
|
||||
keepAliveInitialDelay: options?.keepAliveInitialDelay ?? 5000,
|
||||
timeout: undefined,
|
||||
onread: undefined,
|
||||
readable: true,
|
||||
writable: true
|
||||
};
|
||||
return {
|
||||
create() {
|
||||
return net.createConnection(withDefaults);
|
||||
},
|
||||
event: 'connect'
|
||||
};
|
||||
}
|
||||
|
||||
#shouldReconnect(retries: number, cause: Error) {
|
||||
@@ -164,56 +218,37 @@ export default class RedisSocket extends EventEmitter {
|
||||
}
|
||||
} while (this.#isOpen && !this.#isReady);
|
||||
}
|
||||
|
||||
async #createSocket(): Promise<net.Socket | tls.TLSSocket> {
|
||||
const socket = this.#socketFactory.create();
|
||||
|
||||
#createSocket(): Promise<net.Socket | tls.TLSSocket> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const { connectEvent, socket } = RedisSocket.#isTlsSocket(this.#options) ?
|
||||
this.#createTlsSocket() :
|
||||
this.#createNetSocket();
|
||||
let onTimeout;
|
||||
if (this.#connectTimeout !== undefined) {
|
||||
onTimeout = () => socket.destroy(new ConnectionTimeoutError());
|
||||
socket.once('timeout', onTimeout);
|
||||
socket.setTimeout(this.#connectTimeout);
|
||||
}
|
||||
|
||||
if (this.#options.connectTimeout) {
|
||||
socket.setTimeout(this.#options.connectTimeout, () => socket.destroy(new ConnectionTimeoutError()));
|
||||
}
|
||||
if (this.#isSocketUnrefed) {
|
||||
socket.unref();
|
||||
}
|
||||
|
||||
if (this.#isSocketUnrefed) {
|
||||
socket.unref();
|
||||
}
|
||||
await once(socket, this.#socketFactory.event);
|
||||
|
||||
socket
|
||||
.setNoDelay(this.#options.noDelay)
|
||||
.once('error', reject)
|
||||
.once(connectEvent, () => {
|
||||
socket
|
||||
.setTimeout(0)
|
||||
// https://github.com/nodejs/node/issues/31663
|
||||
.setKeepAlive(this.#options.keepAlive !== false, this.#options.keepAlive || 0)
|
||||
.off('error', reject)
|
||||
.once('error', (err: Error) => this.#onSocketError(err))
|
||||
.once('close', hadError => {
|
||||
if (!hadError && this.#isOpen && this.#socket === socket) {
|
||||
this.#onSocketError(new SocketClosedUnexpectedlyError());
|
||||
}
|
||||
})
|
||||
.on('drain', () => this.emit('drain'))
|
||||
.on('data', data => this.emit('data', data));
|
||||
if (onTimeout) {
|
||||
socket.removeListener('timeout', onTimeout);
|
||||
}
|
||||
|
||||
resolve(socket);
|
||||
});
|
||||
});
|
||||
}
|
||||
socket
|
||||
.once('error', err => this.#onSocketError(err))
|
||||
.once('close', hadError => {
|
||||
if (hadError || !this.#isOpen || this.#socket !== socket) return;
|
||||
this.#onSocketError(new SocketClosedUnexpectedlyError());
|
||||
})
|
||||
.on('drain', () => this.emit('drain'))
|
||||
.on('data', data => this.emit('data', data));
|
||||
|
||||
#createNetSocket(): CreateSocketReturn<net.Socket> {
|
||||
return {
|
||||
connectEvent: 'connect',
|
||||
socket: net.connect(this.#options as net.NetConnectOpts) // TODO
|
||||
};
|
||||
}
|
||||
|
||||
#createTlsSocket(): CreateSocketReturn<tls.TLSSocket> {
|
||||
return {
|
||||
connectEvent: 'secureConnect',
|
||||
socket: tls.connect(this.#options as tls.ConnectionOptions) // TODO
|
||||
};
|
||||
return socket;
|
||||
}
|
||||
|
||||
#onSocketError(err: Error): void {
|
||||
@@ -229,11 +264,11 @@ export default class RedisSocket extends EventEmitter {
|
||||
});
|
||||
}
|
||||
|
||||
write(iterator: IterableIterator<Array<RedisArgument>>): void {
|
||||
write(iterable: Iterable<Array<RedisArgument>>) {
|
||||
if (!this.#socket) return;
|
||||
|
||||
this.#socket.cork();
|
||||
for (const args of iterator) {
|
||||
for (const args of iterable) {
|
||||
for (const toWrite of args) {
|
||||
this.#socket.write(toWrite);
|
||||
}
|
||||
@@ -282,12 +317,12 @@ export default class RedisSocket extends EventEmitter {
|
||||
this.emit('end');
|
||||
}
|
||||
|
||||
ref(): void {
|
||||
ref() {
|
||||
this.#isSocketUnrefed = false;
|
||||
this.#socket?.ref();
|
||||
}
|
||||
|
||||
unref(): void {
|
||||
unref() {
|
||||
this.#isSocketUnrefed = true;
|
||||
this.#socket?.unref();
|
||||
}
|
||||
|
Reference in New Issue
Block a user