1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

fix socket error handlers (#2092)

* fix socket error handlers, reset parser on error

* fix #2080 - reset pubSubState on socket error

* fix pubsub

* fix "RedisSocketInitiator"
This commit is contained in:
Leibale Eidelman
2022-04-25 05:47:51 -04:00
committed by GitHub
parent 8b5a5473a4
commit e6de453fdd
6 changed files with 72 additions and 146 deletions

View File

@@ -53,16 +53,6 @@ interface PubSubListeners {
type PubSubListenersMap = Map<string, PubSubListeners>; type PubSubListenersMap = Map<string, PubSubListeners>;
interface PubSubState {
subscribing: number;
subscribed: number;
unsubscribing: number;
listeners: {
channels: PubSubListenersMap;
patterns: PubSubListenersMap;
};
}
export default class RedisCommandsQueue { export default class RedisCommandsQueue {
static #flushQueue<T extends CommandWaitingForReply>(queue: LinkedList<T>, err: Error): void { static #flushQueue<T extends CommandWaitingForReply>(queue: LinkedList<T>, err: Error): void {
while (queue.length) { while (queue.length) {
@@ -98,7 +88,16 @@ export default class RedisCommandsQueue {
readonly #waitingForReply = new LinkedList<CommandWaitingForReply>(); readonly #waitingForReply = new LinkedList<CommandWaitingForReply>();
#pubSubState: PubSubState | undefined; readonly #pubSubState = {
isActive: false,
subscribing: 0,
subscribed: 0,
unsubscribing: 0,
listeners: {
channels: new Map(),
patterns: new Map()
}
};
static readonly #PUB_SUB_MESSAGES = { static readonly #PUB_SUB_MESSAGES = {
message: Buffer.from('message'), message: Buffer.from('message'),
@@ -111,7 +110,7 @@ export default class RedisCommandsQueue {
readonly #parser = new RedisParser({ readonly #parser = new RedisParser({
returnReply: (reply: unknown) => { returnReply: (reply: unknown) => {
if (this.#pubSubState && Array.isArray(reply)) { if (this.#pubSubState.isActive && Array.isArray(reply)) {
if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) { if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) {
return RedisCommandsQueue.#emitPubSubMessage( return RedisCommandsQueue.#emitPubSubMessage(
this.#pubSubState.listeners.channels, this.#pubSubState.listeners.channels,
@@ -150,7 +149,7 @@ export default class RedisCommandsQueue {
} }
addCommand<T = RedisCommandRawReply>(args: RedisCommandArguments, options?: QueueCommandOptions): Promise<T> { addCommand<T = RedisCommandRawReply>(args: RedisCommandArguments, options?: QueueCommandOptions): Promise<T> {
if (this.#pubSubState && !options?.ignorePubSubMode) { if (this.#pubSubState.isActive && !options?.ignorePubSubMode) {
return Promise.reject(new Error('Cannot send commands in PubSub mode')); return Promise.reject(new Error('Cannot send commands in PubSub mode'));
} else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) { } else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) {
return Promise.reject(new Error('The queue is full')); return Promise.reject(new Error('The queue is full'));
@@ -190,27 +189,16 @@ export default class RedisCommandsQueue {
}); });
} }
#initiatePubSubState(): PubSubState {
return this.#pubSubState ??= {
subscribed: 0,
subscribing: 0,
unsubscribing: 0,
listeners: {
channels: new Map(),
patterns: new Map()
}
};
}
subscribe<T extends boolean>( subscribe<T extends boolean>(
command: PubSubSubscribeCommands, command: PubSubSubscribeCommands,
channels: RedisCommandArgument | Array<RedisCommandArgument>, channels: RedisCommandArgument | Array<RedisCommandArgument>,
listener: PubSubListener<T>, listener: PubSubListener<T>,
returnBuffers?: T returnBuffers?: T
): Promise<void> { ): Promise<void> {
const pubSubState = this.#initiatePubSubState(), const channelsToSubscribe: Array<RedisCommandArgument> = [],
channelsToSubscribe: Array<RedisCommandArgument> = [], listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ?
listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ? pubSubState.listeners.channels : pubSubState.listeners.patterns; this.#pubSubState.listeners.channels :
this.#pubSubState.listeners.patterns;
for (const channel of (Array.isArray(channels) ? channels : [channels])) { for (const channel of (Array.isArray(channels) ? channels : [channels])) {
const channelString = typeof channel === 'string' ? channel : channel.toString(); const channelString = typeof channel === 'string' ? channel : channel.toString();
let listeners = listenersMap.get(channelString); let listeners = listenersMap.get(channelString);
@@ -230,6 +218,7 @@ export default class RedisCommandsQueue {
if (!channelsToSubscribe.length) { if (!channelsToSubscribe.length) {
return Promise.resolve(); return Promise.resolve();
} }
return this.#pushPubSubCommand(command, channelsToSubscribe); return this.#pushPubSubCommand(command, channelsToSubscribe);
} }
@@ -239,10 +228,6 @@ export default class RedisCommandsQueue {
listener?: PubSubListener<T>, listener?: PubSubListener<T>,
returnBuffers?: T returnBuffers?: T
): Promise<void> { ): Promise<void> {
if (!this.#pubSubState) {
return Promise.resolve();
}
const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ?
this.#pubSubState.listeners.channels : this.#pubSubState.listeners.channels :
this.#pubSubState.listeners.patterns; this.#pubSubState.listeners.patterns;
@@ -280,8 +265,7 @@ export default class RedisCommandsQueue {
#pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array<RedisCommandArgument>): Promise<void> { #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array<RedisCommandArgument>): Promise<void> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const pubSubState = this.#initiatePubSubState(), const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing', inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing',
commandArgs: Array<RedisCommandArgument> = [command]; commandArgs: Array<RedisCommandArgument> = [command];
@@ -293,38 +277,42 @@ export default class RedisCommandsQueue {
channelsCounter = channels.length; channelsCounter = channels.length;
} }
pubSubState[inProgressKey] += channelsCounter; this.#pubSubState.isActive = true;
this.#pubSubState[inProgressKey] += channelsCounter;
this.#waitingToBeSent.push({ this.#waitingToBeSent.push({
args: commandArgs, args: commandArgs,
channelsCounter, channelsCounter,
returnBuffers: true, returnBuffers: true,
resolve: () => { resolve: () => {
pubSubState[inProgressKey] -= channelsCounter; this.#pubSubState[inProgressKey] -= channelsCounter;
if (isSubscribe) { this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1);
pubSubState.subscribed += channelsCounter; this.#updatePubSubActiveState();
} else {
pubSubState.subscribed -= channelsCounter;
if (!pubSubState.subscribed && !pubSubState.subscribing && !pubSubState.subscribed) {
this.#pubSubState = undefined;
}
}
resolve(); resolve();
}, },
reject: err => { reject: err => {
pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1); this.#pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1);
this.#updatePubSubActiveState();
reject(err); reject(err);
} }
}); });
}); });
} }
resubscribe(): Promise<any> | undefined { #updatePubSubActiveState(): void {
if (!this.#pubSubState) { if (
return; !this.#pubSubState.subscribed &&
!this.#pubSubState.subscribing &&
!this.#pubSubState.subscribed
) {
this.#pubSubState.isActive = false;
} }
}
resubscribe(): Promise<any> | undefined {
this.#pubSubState.subscribed = 0; this.#pubSubState.subscribed = 0;
this.#pubSubState.subscribing = 0;
this.#pubSubState.unsubscribing = 0;
const promises = [], const promises = [],
{ channels, patterns } = this.#pubSubState.listeners; { channels, patterns } = this.#pubSubState.listeners;
@@ -369,8 +357,7 @@ export default class RedisCommandsQueue {
#setReturnBuffers() { #setReturnBuffers() {
this.#parser.setReturnBuffers( this.#parser.setReturnBuffers(
!!this.#waitingForReply.head?.value.returnBuffers || !!this.#waitingForReply.head?.value.returnBuffers ||
!!this.#pubSubState?.subscribed || !!this.#pubSubState.isActive
!!this.#pubSubState?.subscribing
); );
} }
@@ -390,6 +377,7 @@ export default class RedisCommandsQueue {
} }
flushWaitingForReply(err: Error): void { flushWaitingForReply(err: Error): void {
this.#parser.reset();
RedisCommandsQueue.#flushQueue(this.#waitingForReply, err); RedisCommandsQueue.#flushQueue(this.#waitingForReply, err);
if (!this.#chainInExecution) return; if (!this.#chainInExecution) return;

View File

@@ -3,7 +3,7 @@ import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
import RedisClient, { RedisClientType } from '.'; import RedisClient, { RedisClientType } from '.';
import { RedisClientMultiCommandType } from './multi-command'; import { RedisClientMultiCommandType } from './multi-command';
import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisScripts } from '../commands'; import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisScripts } from '../commands';
import { AbortError, AuthError, ClientClosedError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors'; import { AbortError, ClientClosedError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
import { defineScript } from '../lua-script'; import { defineScript } from '../lua-script';
import { spy } from 'sinon'; import { spy } from 'sinon';
import { once } from 'events'; import { once } from 'events';
@@ -87,30 +87,6 @@ describe('Client', () => {
); );
}, GLOBAL.SERVERS.PASSWORD); }, GLOBAL.SERVERS.PASSWORD);
testUtils.testWithClient('should not retry connecting if failed due to wrong auth', async client => {
let message;
if (testUtils.isVersionGreaterThan([6, 2])) {
message = 'WRONGPASS invalid username-password pair or user is disabled.';
} else if (testUtils.isVersionGreaterThan([6])) {
message = 'WRONGPASS invalid username-password pair';
} else {
message = 'ERR invalid password';
}
await assert.rejects(
client.connect(),
new AuthError(message)
);
assert.equal(client.isOpen, false);
}, {
...GLOBAL.SERVERS.PASSWORD,
clientOptions: {
password: 'wrongpassword'
},
disableClientSetup: true
});
testUtils.testWithClient('should execute AUTH before SELECT', async client => { testUtils.testWithClient('should execute AUTH before SELECT', async client => {
assert.equal( assert.equal(
(await client.clientInfo()).db, (await client.clientInfo()).db,
@@ -300,7 +276,8 @@ describe('Client', () => {
await client.multi() await client.multi()
.sAdd('a', ['b', 'c']) .sAdd('a', ['b', 'c'])
.v4.exec(), .v4.exec(),
[2]) [2]
);
}, { }, {
...GLOBAL.SERVERS.OPEN, ...GLOBAL.SERVERS.OPEN,
clientOptions: { clientOptions: {
@@ -681,10 +658,6 @@ describe('Client', () => {
const listener = spy(); const listener = spy();
await subscriber.subscribe('channel', listener); await subscriber.subscribe('channel', listener);
subscriber.on('error', err => {
console.error('subscriber err', err.message);
});
await Promise.all([ await Promise.all([
once(subscriber, 'error'), once(subscriber, 'error'),
publisher.clientKill({ publisher.clientKill({

View File

@@ -11,7 +11,7 @@ import { ScanCommandOptions } from '../commands/SCAN';
import { HScanTuple } from '../commands/HSCAN'; import { HScanTuple } from '../commands/HSCAN';
import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply } from '../commander'; import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply } from '../commander';
import { Pool, Options as PoolOptions, createPool } from 'generic-pool'; import { Pool, Options as PoolOptions, createPool } from 'generic-pool';
import { ClientClosedError, DisconnectsClientError, AuthError } from '../errors'; import { ClientClosedError, DisconnectsClientError } from '../errors';
import { URL } from 'url'; import { URL } from 'url';
import { TcpSocketConnectOpts } from 'net'; import { TcpSocketConnectOpts } from 'net';
@@ -254,9 +254,7 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
password: this.#options.password ?? '' password: this.#options.password ?? ''
}), }),
{ asap: true } { asap: true }
).catch(err => { )
throw new AuthError(err.message);
})
); );
} }

View File

@@ -21,10 +21,13 @@ describe('Socket', () => {
return time; return time;
}); });
const socket = new RedisSocket(undefined, { const socket = new RedisSocket(
host: 'error', () => Promise.resolve(),
reconnectStrategy {
}); host: 'error',
reconnectStrategy
}
);
socket.on('error', () => { socket.on('error', () => {
// ignore errors // ignore errors

View File

@@ -3,7 +3,7 @@ import * as net from 'net';
import * as tls from 'tls'; import * as tls from 'tls';
import { encodeCommand } from '../commander'; import { encodeCommand } from '../commander';
import { RedisCommandArguments } from '../commands'; import { RedisCommandArguments } from '../commands';
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, AuthError, ReconnectStrategyError } from '../errors'; import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError } from '../errors';
import { promiseTimeout } from '../utils'; import { promiseTimeout } from '../utils';
export interface RedisSocketCommonOptions { export interface RedisSocketCommonOptions {
@@ -53,7 +53,7 @@ export default class RedisSocket extends EventEmitter {
return (options as RedisTlsSocketOptions).tls === true; return (options as RedisTlsSocketOptions).tls === true;
} }
readonly #initiator?: RedisSocketInitiator; readonly #initiator: RedisSocketInitiator;
readonly #options: RedisSocketOptions; readonly #options: RedisSocketOptions;
@@ -79,7 +79,7 @@ export default class RedisSocket extends EventEmitter {
return this.#writableNeedDrain; return this.#writableNeedDrain;
} }
constructor(initiator?: RedisSocketInitiator, options?: RedisSocketOptions) { constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) {
super(); super();
this.#initiator = initiator; this.#initiator = initiator;
@@ -91,70 +91,40 @@ export default class RedisSocket extends EventEmitter {
throw new Error('Socket already opened'); throw new Error('Socket already opened');
} }
return this.#connect(); return this.#connect(0);
} }
async #connect(hadError?: boolean): Promise<void> { async #connect(retries: number, hadError?: boolean): Promise<void> {
try {
this.#isOpen = true;
this.#socket = await this.#retryConnection(0, hadError);
this.#writableNeedDrain = false;
} catch (err) {
this.#isOpen = false;
this.emit('error', err);
this.emit('end');
throw err;
}
if (!this.#isOpen) {
this.disconnect();
return;
}
this.emit('connect');
if (this.#initiator) {
try {
await this.#initiator();
} catch (err) {
this.#socket.destroy();
this.#socket = undefined;
if (err instanceof AuthError) {
this.#isOpen = false;
}
throw err;
}
if (!this.#isOpen) return;
}
this.#isReady = true;
this.emit('ready');
}
async #retryConnection(retries: number, hadError?: boolean): Promise<net.Socket | tls.TLSSocket> {
if (retries > 0 || hadError) { if (retries > 0 || hadError) {
this.emit('reconnecting'); this.emit('reconnecting');
} }
try { try {
return await this.#createSocket(); this.#isOpen = true;
} catch (err) { this.#socket = await this.#createSocket();
if (!this.#isOpen) { this.#writableNeedDrain = false;
this.emit('connect');
try {
await this.#initiator();
} catch (err) {
this.#socket.destroy();
this.#socket = undefined;
throw err; throw err;
} }
this.#isReady = true;
this.emit('ready');
} catch (err) {
this.emit('error', err);
const retryIn = (this.#options?.reconnectStrategy ?? RedisSocket.#defaultReconnectStrategy)(retries); const retryIn = (this.#options?.reconnectStrategy ?? RedisSocket.#defaultReconnectStrategy)(retries);
if (retryIn instanceof Error) { if (retryIn instanceof Error) {
this.#isOpen = false;
throw new ReconnectStrategyError(retryIn, err); throw new ReconnectStrategyError(retryIn, err);
} }
this.emit('error', err);
await promiseTimeout(retryIn); await promiseTimeout(retryIn);
return this.#retryConnection(retries + 1); return this.#connect(retries + 1);
} }
} }
@@ -212,7 +182,7 @@ export default class RedisSocket extends EventEmitter {
this.#isReady = false; this.#isReady = false;
this.emit('error', err); this.emit('error', err);
this.#connect(true).catch(() => { this.#connect(0, true).catch(() => {
// the error was already emitted, silently ignore it // the error was already emitted, silently ignore it
}); });
} }

View File

@@ -34,12 +34,6 @@ export class SocketClosedUnexpectedlyError extends Error {
} }
} }
export class AuthError extends Error {
constructor(message: string) {
super(message);
}
}
export class RootNodesUnavailableError extends Error { export class RootNodesUnavailableError extends Error {
constructor() { constructor() {
super('All the root nodes are unavailable'); super('All the root nodes are unavailable');