You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
Implement retry_strategy and add more info to the reconnect event
This commit is contained in:
54
index.js
54
index.js
@@ -103,6 +103,7 @@ function RedisClient (options) {
|
|||||||
this.old_state = null;
|
this.old_state = null;
|
||||||
this.send_anyway = false;
|
this.send_anyway = false;
|
||||||
this.pipeline = 0;
|
this.pipeline = 0;
|
||||||
|
this.times_connected = 0;
|
||||||
this.options = options;
|
this.options = options;
|
||||||
// Init parser
|
// Init parser
|
||||||
this.reply_parser = new Parser({
|
this.reply_parser = new Parser({
|
||||||
@@ -145,14 +146,15 @@ RedisClient.prototype.create_stream = function () {
|
|||||||
if (this.options.connect_timeout) {
|
if (this.options.connect_timeout) {
|
||||||
this.stream.setTimeout(this.connect_timeout, function () {
|
this.stream.setTimeout(this.connect_timeout, function () {
|
||||||
self.retry_totaltime = self.connect_timeout;
|
self.retry_totaltime = self.connect_timeout;
|
||||||
self.connection_gone('timeout');
|
self.connection_gone('timeout', new Error('Redis connection gone from timeout event'));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/* istanbul ignore next: travis does not work with stunnel atm. Therefor the tls tests are skipped on travis */
|
/* istanbul ignore next: travis does not work with stunnel atm. Therefor the tls tests are skipped on travis */
|
||||||
var connect_event = this.options.tls ? "secureConnect" : "connect";
|
var connect_event = this.options.tls ? 'secureConnect' : 'connect';
|
||||||
this.stream.once(connect_event, function () {
|
this.stream.once(connect_event, function () {
|
||||||
this.removeAllListeners("timeout");
|
this.removeAllListeners('timeout');
|
||||||
|
self.times_connected++;
|
||||||
self.on_connect();
|
self.on_connect();
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -166,17 +168,18 @@ RedisClient.prototype.create_stream = function () {
|
|||||||
self.on_error(err);
|
self.on_error(err);
|
||||||
});
|
});
|
||||||
|
|
||||||
/* istanbul ignore next: travis does not work with stunnel atm. Therefor the tls tests are skipped on travis */
|
/* istanbul ignore next: difficult to test and not important as long as we keep this listener */
|
||||||
this.stream.on('clientError', function (err) {
|
this.stream.on('clientError', function (err) {
|
||||||
|
debug('clientError occured');
|
||||||
self.on_error(err);
|
self.on_error(err);
|
||||||
});
|
});
|
||||||
|
|
||||||
this.stream.once('close', function () {
|
this.stream.once('close', function () {
|
||||||
self.connection_gone('close');
|
self.connection_gone('close', new Error('Stream connection closed'));
|
||||||
});
|
});
|
||||||
|
|
||||||
this.stream.once('end', function () {
|
this.stream.once('end', function () {
|
||||||
self.connection_gone('end');
|
self.connection_gone('end', new Error('Stream connection ended'));
|
||||||
});
|
});
|
||||||
|
|
||||||
this.stream.on('drain', function () {
|
this.stream.on('drain', function () {
|
||||||
@@ -268,10 +271,14 @@ RedisClient.prototype.on_error = function (err) {
|
|||||||
|
|
||||||
this.connected = false;
|
this.connected = false;
|
||||||
this.ready = false;
|
this.ready = false;
|
||||||
this.emit('error', err);
|
|
||||||
|
// Only emit the error if the retry_stategy option is not set
|
||||||
|
if (!this.options.retry_strategy) {
|
||||||
|
this.emit('error', err);
|
||||||
|
}
|
||||||
// 'error' events get turned into exceptions if they aren't listened for. If the user handled this error
|
// 'error' events get turned into exceptions if they aren't listened for. If the user handled this error
|
||||||
// then we should try to reconnect.
|
// then we should try to reconnect.
|
||||||
this.connection_gone('error');
|
this.connection_gone('error', err);
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.on_connect = function () {
|
RedisClient.prototype.on_connect = function () {
|
||||||
@@ -417,12 +424,15 @@ RedisClient.prototype.send_offline_queue = function () {
|
|||||||
this.offline_queue = new Queue();
|
this.offline_queue = new Queue();
|
||||||
};
|
};
|
||||||
|
|
||||||
var retry_connection = function (self) {
|
var retry_connection = function (self, error) {
|
||||||
debug('Retrying connection...');
|
debug('Retrying connection...');
|
||||||
|
|
||||||
self.emit('reconnecting', {
|
self.emit('reconnecting', {
|
||||||
delay: self.retry_delay,
|
delay: self.retry_delay,
|
||||||
attempt: self.attempts
|
attempt: self.attempts,
|
||||||
|
error: error,
|
||||||
|
times_connected: self.times_connected,
|
||||||
|
total_retry_time: self.retry_totaltime
|
||||||
});
|
});
|
||||||
|
|
||||||
self.retry_totaltime += self.retry_delay;
|
self.retry_totaltime += self.retry_delay;
|
||||||
@@ -432,8 +442,7 @@ var retry_connection = function (self) {
|
|||||||
self.retry_timer = null;
|
self.retry_timer = null;
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.connection_gone = function (why) {
|
RedisClient.prototype.connection_gone = function (why, error) {
|
||||||
var error;
|
|
||||||
// If a retry is already in progress, just let that happen
|
// If a retry is already in progress, just let that happen
|
||||||
if (this.retry_timer) {
|
if (this.retry_timer) {
|
||||||
return;
|
return;
|
||||||
@@ -469,6 +478,25 @@ RedisClient.prototype.connection_gone = function (why) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (typeof this.options.retry_strategy === 'function') {
|
||||||
|
this.retry_delay = this.options.retry_strategy({
|
||||||
|
attempt: this.attempts,
|
||||||
|
error: error,
|
||||||
|
total_retry_time: this.retry_totaltime,
|
||||||
|
times_connected: this.times_connected
|
||||||
|
});
|
||||||
|
if (typeof this.retry_delay !== 'number') {
|
||||||
|
// Pass individual error through
|
||||||
|
if (this.retry_delay instanceof Error) {
|
||||||
|
error = this.retry_delay;
|
||||||
|
}
|
||||||
|
this.flush_and_error(error);
|
||||||
|
this.emit('error', error);
|
||||||
|
this.end(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (this.max_attempts !== 0 && this.attempts >= this.max_attempts || this.retry_totaltime >= this.connect_timeout) {
|
if (this.max_attempts !== 0 && this.attempts >= this.max_attempts || this.retry_totaltime >= this.connect_timeout) {
|
||||||
var message = this.retry_totaltime >= this.connect_timeout ?
|
var message = this.retry_totaltime >= this.connect_timeout ?
|
||||||
'connection timeout exceeded.' :
|
'connection timeout exceeded.' :
|
||||||
@@ -502,7 +530,7 @@ RedisClient.prototype.connection_gone = function (why) {
|
|||||||
|
|
||||||
debug('Retry connection in ' + this.retry_delay + ' ms');
|
debug('Retry connection in ' + this.retry_delay + ' ms');
|
||||||
|
|
||||||
this.retry_timer = setTimeout(retry_connection, this.retry_delay, this);
|
this.retry_timer = setTimeout(retry_connection, this.retry_delay, this, error);
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.return_error = function (err) {
|
RedisClient.prototype.return_error = function (err) {
|
||||||
|
@@ -127,6 +127,65 @@ describe("connection tests", function () {
|
|||||||
client.stream.destroy();
|
client.stream.destroy();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("retry_strategy used to reconnect with individual error", function (done) {
|
||||||
|
var text = '';
|
||||||
|
var unhookIntercept = intercept(function (data) {
|
||||||
|
text += data;
|
||||||
|
return '';
|
||||||
|
});
|
||||||
|
var end = helper.callFuncAfter(done, 2);
|
||||||
|
client = redis.createClient({
|
||||||
|
retry_strategy: function (options) {
|
||||||
|
if (options.total_retry_time > 150) {
|
||||||
|
client.set('foo', 'bar', function (err, res) {
|
||||||
|
assert.strictEqual(err.message, 'Connection timeout');
|
||||||
|
end();
|
||||||
|
});
|
||||||
|
// Pass a individual error message to the error handler
|
||||||
|
return new Error('Connection timeout');
|
||||||
|
}
|
||||||
|
return Math.min(options.attempt * 25, 200);
|
||||||
|
},
|
||||||
|
max_attempts: 5,
|
||||||
|
retry_max_delay: 123,
|
||||||
|
port: 9999
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on('error', function(err) {
|
||||||
|
unhookIntercept();
|
||||||
|
assert.strictEqual(
|
||||||
|
text,
|
||||||
|
'node_redis: WARNING: You activated the retry_strategy and max_attempts at the same time. This is not possible and max_attempts will be ignored.\n' +
|
||||||
|
'node_redis: WARNING: You activated the retry_strategy and retry_max_delay at the same time. This is not possible and retry_max_delay will be ignored.\n'
|
||||||
|
);
|
||||||
|
assert.strictEqual(err.message, 'Connection timeout');
|
||||||
|
assert(!err.code);
|
||||||
|
end();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("retry_strategy used to reconnect", function (done) {
|
||||||
|
var end = helper.callFuncAfter(done, 2);
|
||||||
|
client = redis.createClient({
|
||||||
|
retry_strategy: function (options) {
|
||||||
|
if (options.total_retry_time > 150) {
|
||||||
|
client.set('foo', 'bar', function (err, res) {
|
||||||
|
assert.strictEqual(err.code, 'ECONNREFUSED');
|
||||||
|
end();
|
||||||
|
});
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return Math.min(options.attempt * 25, 200);
|
||||||
|
},
|
||||||
|
port: 9999
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on('error', function(err) {
|
||||||
|
assert.strictEqual(err.code, 'ECONNREFUSED');
|
||||||
|
end();
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("when not connected", function () {
|
describe("when not connected", function () {
|
||||||
|
Reference in New Issue
Block a user