You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
ref #1789 - reject "hanging" promises when closing connection
This commit is contained in:
@@ -268,7 +268,11 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
|
|||||||
.on('data', data => this.#queue.parseResponse(data))
|
.on('data', data => this.#queue.parseResponse(data))
|
||||||
.on('error', err => {
|
.on('error', err => {
|
||||||
this.emit('error', err);
|
this.emit('error', err);
|
||||||
this.#queue.flushWaitingForReply(err);
|
if (!this.#socket.isOpen) {
|
||||||
|
this.#queue.flushAll(err);
|
||||||
|
} else {
|
||||||
|
this.#queue.flushWaitingForReply(err);
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.on('connect', () => this.emit('connect'))
|
.on('connect', () => this.emit('connect'))
|
||||||
.on('ready', () => {
|
.on('ready', () => {
|
||||||
|
@@ -3,7 +3,7 @@ import * as net from 'net';
|
|||||||
import * as tls from 'tls';
|
import * as tls from 'tls';
|
||||||
import { encodeCommand } from '../commander';
|
import { encodeCommand } from '../commander';
|
||||||
import { RedisCommandArguments } from '../commands';
|
import { RedisCommandArguments } from '../commands';
|
||||||
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, AuthError } from '../errors';
|
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, AuthError, ReconnectStrategyError } from '../errors';
|
||||||
import { promiseTimeout } from '../utils';
|
import { promiseTimeout } from '../utils';
|
||||||
|
|
||||||
export interface RedisSocketCommonOptions {
|
export interface RedisSocketCommonOptions {
|
||||||
@@ -93,9 +93,16 @@ export default class RedisSocket extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async #connect(hadError?: boolean): Promise<void> {
|
async #connect(hadError?: boolean): Promise<void> {
|
||||||
this.#isOpen = true;
|
try {
|
||||||
this.#socket = await this.#retryConnection(0, hadError);
|
this.#isOpen = true;
|
||||||
this.#writableNeedDrain = false;
|
this.#socket = await this.#retryConnection(0, hadError);
|
||||||
|
this.#writableNeedDrain = false;
|
||||||
|
} catch (err) {
|
||||||
|
this.#isOpen = false;
|
||||||
|
this.emit('error', err);
|
||||||
|
this.emit('end');
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
|
||||||
if (!this.#isOpen) {
|
if (!this.#isOpen) {
|
||||||
this.disconnect();
|
this.disconnect();
|
||||||
@@ -134,17 +141,16 @@ export default class RedisSocket extends EventEmitter {
|
|||||||
try {
|
try {
|
||||||
return await this.#createSocket();
|
return await this.#createSocket();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.emit('error', err);
|
|
||||||
|
|
||||||
if (!this.#isOpen) {
|
if (!this.#isOpen) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
const retryIn = (this.#options?.reconnectStrategy ?? RedisSocket.#defaultReconnectStrategy)(retries);
|
const retryIn = (this.#options?.reconnectStrategy ?? RedisSocket.#defaultReconnectStrategy)(retries);
|
||||||
if (retryIn instanceof Error) {
|
if (retryIn instanceof Error) {
|
||||||
throw retryIn;
|
throw new ReconnectStrategyError(retryIn, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.emit('error', err);
|
||||||
await promiseTimeout(retryIn);
|
await promiseTimeout(retryIn);
|
||||||
return this.#retryConnection(retries + 1);
|
return this.#retryConnection(retries + 1);
|
||||||
}
|
}
|
||||||
|
@@ -45,3 +45,14 @@ export class RootNodesUnavailableError extends Error {
|
|||||||
super('All the root nodes are unavailable');
|
super('All the root nodes are unavailable');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export class ReconnectStrategyError extends Error {
|
||||||
|
originalError: Error;
|
||||||
|
socketError: unknown;
|
||||||
|
|
||||||
|
constructor(originalError: Error, socketError: unknown) {
|
||||||
|
super(originalError.message);
|
||||||
|
this.originalError = originalError;
|
||||||
|
this.socketError = socketError;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user