1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-10 11:43:01 +03:00

Merge pull request #829 from fintura/broken-mode

Implement redis connection broken mode and more shiny things

Fixes #569
Fixes #587
Fixes #566 
Fixes #586 
Fixes #280 

This includes the fixes as suggested in #671, #615 and #533. Thx a lot to @qdb, @tobek and @chrishamant 

Closes #675, #463, #362, #438 and #724
This commit is contained in:
Ruben Bridgewater
2015-09-16 05:35:50 +02:00
8 changed files with 243 additions and 82 deletions

View File

@@ -17,7 +17,7 @@
"mocha": true, "mocha": true,
// Relaxing options // Relaxing options
"boss": true, // Accept things like `while (command = keys.shift()) { ... }` "boss": true, // Accept statements like `while (key = keys.pop()) {}`
"overrides": { "overrides": {
"examples/*.js": { "examples/*.js": {

View File

@@ -113,6 +113,11 @@ then replayed just before this event is emitted.
is set. If this options is set, `connect` will be emitted when the stream is connected, and then is set. If this options is set, `connect` will be emitted when the stream is connected, and then
you are free to try to send commands. you are free to try to send commands.
### "reconnecting"
`client` will emit `reconnecting` when trying to reconnect to the Redis server after losing the connection. Listeners
are passed an object containing `delay` (in ms) and `attempt` (the attempt #) attributes.
### "error" ### "error"
`client` will emit `error` when encountering an error connecting to the Redis server. `client` will emit `error` when encountering an error connecting to the Redis server.
@@ -189,10 +194,11 @@ with an error, or an error will be thrown if no callback is specified.
* `retry_max_delay`: defaults to `null`. By default every time the client tries to connect and fails time before * `retry_max_delay`: defaults to `null`. By default every time the client tries to connect and fails time before
reconnection (delay) almost doubles. This delay normally grows infinitely, but setting `retry_max_delay` limits delay reconnection (delay) almost doubles. This delay normally grows infinitely, but setting `retry_max_delay` limits delay
to maximum value, provided in milliseconds. to maximum value, provided in milliseconds.
* `connect_timeout` defaults to `false`. By default client will try reconnecting until connected. Setting `connect_timeout` * `connect_timeout` defaults to `86400000`. Setting `connect_timeout` limits total time for client to reconnect.
limits total time for client to reconnect. Value is provided in milliseconds and is counted once the disconnect occured. Value is provided in milliseconds and is counted once the disconnect occured. The last retry is going to happen exactly at the timeout time.
* `max_attempts` defaults to `null`. By default client will try reconnecting until connected. Setting `max_attempts` That way the default is to try reconnecting until 24h passed.
limits total amount of reconnects. * `max_attempts` defaults to `0`. By default client will try reconnecting until connected. Setting `max_attempts`
limits total amount of connection tries. Setting this to 1 will prevent any reconnect tries.
* `auth_pass` defaults to `null`. By default client will try connecting without auth. If set, client will run redis auth command on connect. * `auth_pass` defaults to `null`. By default client will try connecting without auth. If set, client will run redis auth command on connect.
* `family` defaults to `IPv4`. The client connects in IPv4 if not specified or if the DNS resolution returns an IPv4 address. * `family` defaults to `IPv4`. The client connects in IPv4 if not specified or if the DNS resolution returns an IPv4 address.
You can force an IPv6 if you set the family to 'IPv6'. See nodejs net or dns modules how to use the family type. You can force an IPv6 if you set the family to 'IPv6'. See nodejs net or dns modules how to use the family type.
@@ -576,12 +582,12 @@ some kind of maximum queue depth for pre-connection commands.
## client.retry_delay ## client.retry_delay
Current delay in milliseconds before a connection retry will be attempted. This starts at `250`. Current delay in milliseconds before a connection retry will be attempted. This starts at `200`.
## client.retry_backoff ## client.retry_backoff
Multiplier for future retry timeouts. This should be larger than 1 to add more time between retries. Multiplier for future retry timeouts. This should be larger than 1 to add more time between retries.
Defaults to 1.7. The default initial connection retry is 250, so the second retry will be 425, followed by 723.5, etc. Defaults to 1.7. The default initial connection retry is 200, so the second retry will be 340, followed by 578, etc.
### Commands with Optional and Keyword arguments ### Commands with Optional and Keyword arguments

View File

@@ -52,15 +52,11 @@ function RedisClient(stream, options) {
this.should_buffer = false; this.should_buffer = false;
this.command_queue_high_water = this.options.command_queue_high_water || 1000; this.command_queue_high_water = this.options.command_queue_high_water || 1000;
this.command_queue_low_water = this.options.command_queue_low_water || 0; this.command_queue_low_water = this.options.command_queue_low_water || 0;
if (options.max_attempts && options.max_attempts > 0) { this.max_attempts = +options.max_attempts || 0;
this.max_attempts = +options.max_attempts;
}
this.command_queue = new Queue(); // holds sent commands to de-pipeline them this.command_queue = new Queue(); // holds sent commands to de-pipeline them
this.offline_queue = new Queue(); // holds commands issued but not able to be sent this.offline_queue = new Queue(); // holds commands issued but not able to be sent
this.commands_sent = 0; this.commands_sent = 0;
if (options.connect_timeout && options.connect_timeout > 0) { this.connect_timeout = +options.connect_timeout || 86400000; // 24 * 60 * 60 * 1000 ms
this.connect_timeout = +options.connect_timeout;
}
this.enable_offline_queue = true; this.enable_offline_queue = true;
if (this.options.enable_offline_queue === false) { if (this.options.enable_offline_queue === false) {
this.enable_offline_queue = false; this.enable_offline_queue = false;
@@ -123,7 +119,7 @@ RedisClient.prototype.install_stream_listeners = function() {
RedisClient.prototype.initialize_retry_vars = function () { RedisClient.prototype.initialize_retry_vars = function () {
this.retry_timer = null; this.retry_timer = null;
this.retry_totaltime = 0; this.retry_totaltime = 0;
this.retry_delay = 150; this.retry_delay = 200;
this.retry_backoff = 1.7; this.retry_backoff = 1.7;
this.attempts = 1; this.attempts = 1;
}; };
@@ -141,21 +137,17 @@ RedisClient.prototype.unref = function () {
}; };
// flush offline_queue and command_queue, erroring any items with a callback first // flush offline_queue and command_queue, erroring any items with a callback first
RedisClient.prototype.flush_and_error = function (message) { RedisClient.prototype.flush_and_error = function (error) {
var command_obj, error; var command_obj;
error = new Error(message); while (command_obj = this.offline_queue.shift()) {
while (this.offline_queue.length > 0) {
command_obj = this.offline_queue.shift();
if (typeof command_obj.callback === "function") { if (typeof command_obj.callback === "function") {
command_obj.callback(error); command_obj.callback(error);
} }
} }
this.offline_queue = new Queue(); this.offline_queue = new Queue();
while (this.command_queue.length > 0) { while (command_obj = this.command_queue.shift()) {
command_obj = this.command_queue.shift();
if (typeof command_obj.callback === "function") { if (typeof command_obj.callback === "function") {
command_obj.callback(error); command_obj.callback(error);
} }
@@ -172,8 +164,6 @@ RedisClient.prototype.on_error = function (msg) {
debug(message); debug(message);
this.flush_and_error(message);
this.connected = false; this.connected = false;
this.ready = false; this.ready = false;
@@ -399,8 +389,8 @@ RedisClient.prototype.ready_check = function () {
RedisClient.prototype.send_offline_queue = function () { RedisClient.prototype.send_offline_queue = function () {
var command_obj, buffered_writes = 0; var command_obj, buffered_writes = 0;
while (this.offline_queue.length > 0) { // TODO: Implement queue.pop() as it should be faster than shift and evaluate petka antonovs queue
command_obj = this.offline_queue.shift(); while (command_obj = this.offline_queue.shift()) {
debug("Sending offline command: " + command_obj.command); debug("Sending offline command: " + command_obj.command);
buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback); buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback);
} }
@@ -443,51 +433,49 @@ RedisClient.prototype.connection_gone = function (why) {
this.emitted_end = true; this.emitted_end = true;
} }
this.flush_and_error("Redis connection gone from " + why + " event.");
// If this is a requested shutdown, then don't retry // If this is a requested shutdown, then don't retry
if (this.closing) { if (this.closing) {
this.retry_timer = null; debug("connection ended from quit command, not retrying.");
debug("Connection ended from quit command, not retrying."); this.flush_and_error(new Error("Redis connection gone from " + why + " event."));
return; return;
} }
var nextDelay = Math.floor(this.retry_delay * this.retry_backoff); if (this.max_attempts !== 0 && this.attempts >= this.max_attempts || this.retry_totaltime >= this.connect_timeout) {
if (this.retry_max_delay !== null && nextDelay > this.retry_max_delay) { var message = this.retry_totaltime >= this.connect_timeout ?
'connection timeout exceeded.' :
'maximum connection attempts exceeded.';
var error = new Error("Redis connection in broken state: " + message);
error.code = 'CONNECTION_BROKEN';
this.flush_and_error(error);
this.emit('error', error);
this.end();
return;
}
if (this.retry_max_delay !== null && this.retry_delay > this.retry_max_delay) {
this.retry_delay = this.retry_max_delay; this.retry_delay = this.retry_max_delay;
} else { } else if (this.retry_totaltime + this.retry_delay > this.connect_timeout) {
this.retry_delay = nextDelay; // Do not exceed the maximum
this.retry_delay = this.connect_timeout - this.retry_totaltime;
} }
debug("Retry connection in " + this.retry_delay + " ms"); debug("Retry connection in " + this.retry_delay + " ms");
if (this.max_attempts && this.attempts >= this.max_attempts) {
this.retry_timer = null;
// TODO - some people need a "Redis is Broken mode" for future commands that errors immediately, and others
// want the program to exit. Right now, we just log, which doesn't really help in either case.
debug("Couldn't get Redis connection after " + this.max_attempts + " attempts.");
return;
}
this.attempts += 1;
this.emit("reconnecting", {
delay: self.retry_delay,
attempt: self.attempts
});
this.retry_timer = setTimeout(function () { this.retry_timer = setTimeout(function () {
debug("Retrying connection..."); debug("Retrying connection...");
self.retry_totaltime += self.retry_delay; self.emit("reconnecting", {
delay: self.retry_delay,
attempt: self.attempts
});
if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) { self.retry_totaltime += self.retry_delay;
self.retry_timer = null; self.attempts += 1;
// TODO - engage Redis is Broken mode for future commands, or whatever self.retry_delay = Math.round(self.retry_delay * self.retry_backoff);
debug("Couldn't get Redis connection after " + self.retry_totaltime + "ms.");
return;
}
self.stream = net.createConnection(self.connectionOption); self.stream = net.createConnection(self.connectionOption);
self.install_stream_listeners(); self.install_stream_listeners();
self.retry_timer = null; self.retry_timer = null;
}, this.retry_delay); }, this.retry_delay);
}; };
@@ -836,12 +824,12 @@ RedisClient.prototype.pub_sub_command = function (command_obj) {
RedisClient.prototype.end = function () { RedisClient.prototype.end = function () {
this.stream._events = {}; this.stream._events = {};
//clear retry_timer // Clear retry_timer
if (this.retry_timer){ if (this.retry_timer){
clearTimeout(this.retry_timer); clearTimeout(this.retry_timer);
this.retry_timer = null; this.retry_timer = null;
} }
this.stream.on("error", function(){}); this.stream.on("error", function noop(){});
this.connected = false; this.connected = false;
this.ready = false; this.ready = false;
@@ -1047,7 +1035,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
// TODO - make this callback part of Multi.prototype instead of creating it each time // TODO - make this callback part of Multi.prototype instead of creating it each time
return this._client.send_command("exec", [], function (err, replies) { return this._client.send_command("exec", [], function (err, replies) {
if (err) { if (err && !err.code) {
if (callback) { if (callback) {
errors.push(err); errors.push(err);
callback(errors); callback(errors);
@@ -1083,6 +1071,9 @@ Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
if (callback) { if (callback) {
callback(null, replies); callback(null, replies);
} else if (err && err.code !== 'CONNECTION_BROKEN') {
// Exclude CONNECTION_BROKEN so that error won't be emitted twice
self._client.emit('error', err);
} }
}); });
}; };

View File

@@ -12,10 +12,11 @@ describe("client authentication", function () {
}); });
}); });
helper.allTests(function(parser, ip, args) { helper.allTests({
allConnections: true
}, function(parser, ip, args) {
describe("using " + parser + " and " + ip, function () { describe("using " + parser + " and " + ip, function () {
var args = config.configureClient(parser, ip);
var auth = 'porkchopsandwiches'; var auth = 'porkchopsandwiches';
var client = null; var client = null;

View File

@@ -13,7 +13,6 @@ describe("The 'hgetall' method", function () {
var client; var client;
describe('regular client', function () { describe('regular client', function () {
var args = config.configureClient(parser, ip);
beforeEach(function (done) { beforeEach(function (done) {
client = redis.createClient.apply(redis.createClient, args); client = redis.createClient.apply(redis.createClient, args);

84
test/connection.spec.js Normal file
View File

@@ -0,0 +1,84 @@
'use strict';
var assert = require("assert");
var config = require("./lib/config");
var helper = require('./helper');
var redis = config.redis;
describe("on lost connection", function () {
helper.allTests(function(parser, ip, args) {
describe("using " + parser + " and " + ip, function () {
it("emit an error after max retry attempts and do not try to reconnect afterwards", function (done) {
var max_attempts = 4;
var client = redis.createClient({
parser: parser,
max_attempts: max_attempts
});
var calls = 0;
client.once('ready', function() {
helper.killConnection(client);
});
client.on("reconnecting", function (params) {
calls++;
});
client.on('error', function(err) {
if (/Redis connection in broken state: maximum connection attempts.*?exceeded./.test(err.message)) {
setTimeout(function () {
assert.strictEqual(calls, max_attempts - 1);
done();
}, 1500);
}
});
});
it("emit an error after max retry timeout and do not try to reconnect afterwards", function (done) {
var connect_timeout = 1000; // in ms
var client = redis.createClient({
parser: parser,
connect_timeout: connect_timeout
});
var time = 0;
client.once('ready', function() {
helper.killConnection(client);
});
client.on("reconnecting", function (params) {
time += params.delay;
});
client.on('error', function(err) {
if (/Redis connection in broken state: connection timeout.*?exceeded./.test(err.message)) {
setTimeout(function () {
assert(time === connect_timeout);
done();
}, 1500);
}
});
});
it("end connection while retry is still ongoing", function (done) {
var connect_timeout = 1000; // in ms
var client = redis.createClient({
parser: parser,
connect_timeout: connect_timeout
});
client.once('ready', function() {
helper.killConnection(client);
});
client.on("reconnecting", function (params) {
client.end();
setTimeout(done, 100);
});
});
});
});
});

View File

@@ -108,22 +108,27 @@ module.exports = {
} }
return true; return true;
}, },
allTests: function (cb) { allTests: function (options, cb) {
[undefined].forEach(function (options) { // add buffer option at some point if (!cb) {
describe(options && options.return_buffers ? "returning buffers" : "returning strings", function () { cb = options;
options = {};
}
// TODO: Test all different option cases at some point (e.g. buffers)
// [undefined, { return_buffers: true }].forEach(function (config_options) {
// describe(config_options && config_options.return_buffers ? "returning buffers" : "returning strings", function () {
// });
// });
var parsers = ['javascript']; var parsers = ['javascript'];
var protocols = ['IPv4']; var protocols = ['IPv4'];
if (process.platform !== 'win32') { if (process.platform !== 'win32') {
parsers.push('hiredis'); parsers.push('hiredis');
protocols.push('IPv6'); protocols.push('IPv6', '/tmp/redis.sock');
} }
parsers.forEach(function (parser) { parsers.forEach(function (parser) {
if (process.platform !== 'win32') cb(parser, "/tmp/redis.sock", config.configureClient(parser, "/tmp/redis.sock", options)); protocols.forEach(function (ip, i) {
protocols.forEach(function (ip) { if (i === 0 || options.allConnections) {
cb(parser, ip, config.configureClient(parser, ip, options)); cb(parser, ip, config.configureClient(parser, ip));
}); }
});
}); });
}); });
}, },
@@ -140,5 +145,15 @@ module.exports = {
func(); func();
} }
}; };
},
killConnection: function (client) {
// Change the connection option to a non existing one and destroy the stream
client.connectionOption = {
port: 6370,
host: '127.0.0.2',
family: 4
};
client.address = '127.0.0.2:6370';
client.stream.destroy();
} }
}; };

View File

@@ -8,7 +8,9 @@ var redis = config.redis;
describe("The node_redis client", function () { describe("The node_redis client", function () {
helper.allTests(function(parser, ip, args) { helper.allTests({
allConnections: true
}, function(parser, ip, args) {
if (args[2]) { // skip if options are undefined if (args[2]) { // skip if options are undefined
describe("testing parser existence", function () { describe("testing parser existence", function () {
@@ -624,7 +626,6 @@ describe("The node_redis client", function () {
describe('defaults to true', function () { describe('defaults to true', function () {
var client; var client;
var args = config.configureClient(parser, ip);
it("fires client.on('ready')", function (done) { it("fires client.on('ready')", function (done) {
client = redis.createClient.apply(redis.createClient, args); client = redis.createClient.apply(redis.createClient, args);
@@ -709,13 +710,43 @@ describe("The node_redis client", function () {
}, 25); }, 25);
}, 50); }, 50);
}); });
it("enqueues operation and keep the queue while trying to reconnect", function (done) {
var client = redis.createClient(9999, null, {
max_attempts: 4,
parser: parser
});
var i = 0;
client.on('error', function(err) {
if (err.message === 'Redis connection in broken state: maximum connection attempts exceeded.') {
assert(i, 3);
assert.strictEqual(client.offline_queue.length, 0);
done();
}
});
client.on('reconnecting', function(params) {
i++;
assert.equal(params.attempt, i);
assert.strictEqual(client.offline_queue.length, 2);
});
// Should work with either a callback or without
client.set('baz', 13);
client.set('foo', 'bar', function(err, result) {
assert(i, 3);
assert('Redis connection gone from error event', err.message);
assert.strictEqual(client.offline_queue.length, 0);
});
});
}); });
describe('false', function () { describe('false', function () {
it("does not emit an error and enqueues operation", function (done) { it("does not emit an error and enqueues operation", function (done) {
var client = redis.createClient(9999, null, { var client = redis.createClient(9999, null, {
parser: parser, parser: parser,
max_attempts: 1, max_attempts: 0,
enable_offline_queue: false enable_offline_queue: false
}); });
@@ -735,6 +766,40 @@ describe("The node_redis client", function () {
}); });
}); });
}); });
it("flushes the command queue connection if in broken connection mode", function (done) {
var client = redis.createClient({
parser: parser,
max_attempts: 2,
enable_offline_queue: false
});
client.once('ready', function() {
var multi = client.multi();
multi.config("bar");
var cb = function(err, reply) {
assert.equal(err.code, 'CONNECTION_BROKEN');
};
for (var i = 0; i < 10; i += 2) {
multi.set("foo" + i, "bar" + i);
multi.set("foo" + (i + 1), "bar" + (i + 1), cb);
}
multi.exec();
assert.equal(client.command_queue.length, 13);
helper.killConnection(client);
});
client.on("reconnecting", function (params) {
assert.equal(client.command_queue.length, 13);
});
client.on('error', function(err) {
if (/Redis connection in broken state:/.test(err.message)) {
assert.equal(client.command_queue.length, 0);
done();
}
});
});
}); });
}); });