You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-07 13:22:56 +03:00
Handle unhandled errors in socket.reconnectStrategry
(#2226)
* handle errors in reconnect strategy * add test * fix retries typo * fix #2237 - flush queues on reconnect strategy error * Update socket.ts * Update socket.ts
This commit is contained in:
@@ -1,41 +1,63 @@
|
|||||||
import { strict as assert } from 'assert';
|
import { strict as assert } from 'assert';
|
||||||
import { SinonFakeTimers, useFakeTimers, spy } from 'sinon';
|
import { SinonFakeTimers, useFakeTimers, spy } from 'sinon';
|
||||||
import RedisSocket from './socket';
|
import RedisSocket, { RedisSocketOptions } from './socket';
|
||||||
|
|
||||||
describe('Socket', () => {
|
describe('Socket', () => {
|
||||||
|
function createSocket(options: RedisSocketOptions): RedisSocket {
|
||||||
|
const socket = new RedisSocket(
|
||||||
|
() => Promise.resolve(),
|
||||||
|
options
|
||||||
|
);
|
||||||
|
|
||||||
|
socket.on('error', (err) => {
|
||||||
|
// ignore errors
|
||||||
|
console.log(err);
|
||||||
|
});
|
||||||
|
|
||||||
|
return socket;
|
||||||
|
}
|
||||||
|
|
||||||
describe('reconnectStrategy', () => {
|
describe('reconnectStrategy', () => {
|
||||||
let clock: SinonFakeTimers;
|
let clock: SinonFakeTimers;
|
||||||
beforeEach(() => clock = useFakeTimers());
|
beforeEach(() => clock = useFakeTimers());
|
||||||
afterEach(() => clock.restore());
|
afterEach(() => clock.restore());
|
||||||
|
|
||||||
it('custom strategy', () => {
|
it('custom strategy', async () => {
|
||||||
const reconnectStrategy = spy((retries: number): number | Error => {
|
const reconnectStrategy = spy((retries: number) => {
|
||||||
assert.equal(retries + 1, reconnectStrategy.callCount);
|
assert.equal(retries + 1, reconnectStrategy.callCount);
|
||||||
|
|
||||||
if (retries === 50) {
|
if (retries === 50) return new Error('50');
|
||||||
return Error('50');
|
|
||||||
}
|
|
||||||
|
|
||||||
const time = retries * 2;
|
const time = retries * 2;
|
||||||
queueMicrotask(() => clock.tick(time));
|
queueMicrotask(() => clock.tick(time));
|
||||||
return time;
|
return time;
|
||||||
});
|
});
|
||||||
|
|
||||||
const socket = new RedisSocket(
|
const socket = createSocket({
|
||||||
() => Promise.resolve(),
|
host: 'error',
|
||||||
{
|
reconnectStrategy
|
||||||
host: 'error',
|
|
||||||
reconnectStrategy
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
socket.on('error', () => {
|
|
||||||
// ignore errors
|
|
||||||
});
|
});
|
||||||
|
|
||||||
return assert.rejects(socket.connect(), {
|
await assert.rejects(socket.connect(), {
|
||||||
message: '50'
|
message: '50'
|
||||||
});
|
});
|
||||||
|
|
||||||
|
assert.equal(socket.isOpen, false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle errors', async () => {
|
||||||
|
const socket = createSocket({
|
||||||
|
host: 'error',
|
||||||
|
reconnectStrategy(retries: number) {
|
||||||
|
if (retries === 1) return new Error('done');
|
||||||
|
queueMicrotask(() => clock.tick(500));
|
||||||
|
throw new Error();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
await assert.rejects(socket.connect());
|
||||||
|
|
||||||
|
assert.equal(socket.isOpen, false);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@@ -44,10 +44,6 @@ export default class RedisSocket extends EventEmitter {
|
|||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
|
|
||||||
static #defaultReconnectStrategy(retries: number): number {
|
|
||||||
return Math.min(retries * 50, 500);
|
|
||||||
}
|
|
||||||
|
|
||||||
static #isTlsSocket(options: RedisSocketOptions): options is RedisTlsSocketOptions {
|
static #isTlsSocket(options: RedisSocketOptions): options is RedisTlsSocketOptions {
|
||||||
return (options as RedisTlsSocketOptions).tls === true;
|
return (options as RedisTlsSocketOptions).tls === true;
|
||||||
}
|
}
|
||||||
@@ -87,6 +83,23 @@ export default class RedisSocket extends EventEmitter {
|
|||||||
this.#options = RedisSocket.#initiateOptions(options);
|
this.#options = RedisSocket.#initiateOptions(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reconnectStrategy(retries: number): number | Error {
|
||||||
|
if (this.#options.reconnectStrategy) {
|
||||||
|
try {
|
||||||
|
const retryIn = this.#options.reconnectStrategy(retries);
|
||||||
|
if (typeof retryIn !== 'number' && !(retryIn instanceof Error)) {
|
||||||
|
throw new TypeError('Reconnect strategy should return `number | Error`');
|
||||||
|
}
|
||||||
|
|
||||||
|
return retryIn;
|
||||||
|
} catch (err) {
|
||||||
|
this.emit('error', err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Math.min(retries * 50, 500);
|
||||||
|
}
|
||||||
|
|
||||||
async connect(): Promise<void> {
|
async connect(): Promise<void> {
|
||||||
if (this.#isOpen) {
|
if (this.#isOpen) {
|
||||||
throw new Error('Socket already opened');
|
throw new Error('Socket already opened');
|
||||||
@@ -116,14 +129,14 @@ export default class RedisSocket extends EventEmitter {
|
|||||||
this.#isReady = true;
|
this.#isReady = true;
|
||||||
this.emit('ready');
|
this.emit('ready');
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.emit('error', err);
|
const retryIn = this.reconnectStrategy(retries);
|
||||||
|
|
||||||
const retryIn = (this.#options?.reconnectStrategy ?? RedisSocket.#defaultReconnectStrategy)(retries);
|
|
||||||
if (retryIn instanceof Error) {
|
if (retryIn instanceof Error) {
|
||||||
this.#isOpen = false;
|
this.#isOpen = false;
|
||||||
|
this.emit('error', err);
|
||||||
throw new ReconnectStrategyError(retryIn, err);
|
throw new ReconnectStrategyError(retryIn, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.emit('error', err);
|
||||||
await promiseTimeout(retryIn);
|
await promiseTimeout(retryIn);
|
||||||
return this.#connect(retries + 1);
|
return this.#connect(retries + 1);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user