diff --git a/.jshintrc b/.jshintrc index c5caf777dd..92441eb976 100644 --- a/.jshintrc +++ b/.jshintrc @@ -17,7 +17,7 @@ "mocha": true, // Relaxing options - "boss": true, // Accept things like `while (command = keys.shift()) { ... }` + "boss": true, // Accept statements like `while (key = keys.pop()) {}` "overrides": { "examples/*.js": { diff --git a/README.md b/README.md index 0e1d444420..a29c6ae134 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,11 @@ then replayed just before this event is emitted. is set. If this options is set, `connect` will be emitted when the stream is connected, and then you are free to try to send commands. +### "reconnecting" + +`client` will emit `reconnecting` when trying to reconnect to the Redis server after losing the connection. Listeners +are passed an object containing `delay` (in ms) and `attempt` (the attempt #) attributes. + ### "error" `client` will emit `error` when encountering an error connecting to the Redis server. @@ -189,10 +194,11 @@ with an error, or an error will be thrown if no callback is specified. * `retry_max_delay`: defaults to `null`. By default every time the client tries to connect and fails time before reconnection (delay) almost doubles. This delay normally grows infinitely, but setting `retry_max_delay` limits delay to maximum value, provided in milliseconds. -* `connect_timeout` defaults to `false`. By default client will try reconnecting until connected. Setting `connect_timeout` -limits total time for client to reconnect. Value is provided in milliseconds and is counted once the disconnect occured. -* `max_attempts` defaults to `null`. By default client will try reconnecting until connected. Setting `max_attempts` -limits total amount of reconnects. +* `connect_timeout` defaults to `86400000`. Setting `connect_timeout` limits total time for client to reconnect. +Value is provided in milliseconds and is counted once the disconnect occured. The last retry is going to happen exactly at the timeout time. +That way the default is to try reconnecting until 24h passed. +* `max_attempts` defaults to `0`. By default client will try reconnecting until connected. Setting `max_attempts` +limits total amount of connection tries. Setting this to 1 will prevent any reconnect tries. * `auth_pass` defaults to `null`. By default client will try connecting without auth. If set, client will run redis auth command on connect. * `family` defaults to `IPv4`. The client connects in IPv4 if not specified or if the DNS resolution returns an IPv4 address. You can force an IPv6 if you set the family to 'IPv6'. See nodejs net or dns modules how to use the family type. @@ -576,12 +582,12 @@ some kind of maximum queue depth for pre-connection commands. ## client.retry_delay -Current delay in milliseconds before a connection retry will be attempted. This starts at `250`. +Current delay in milliseconds before a connection retry will be attempted. This starts at `200`. ## client.retry_backoff Multiplier for future retry timeouts. This should be larger than 1 to add more time between retries. -Defaults to 1.7. The default initial connection retry is 250, so the second retry will be 425, followed by 723.5, etc. +Defaults to 1.7. The default initial connection retry is 200, so the second retry will be 340, followed by 578, etc. ### Commands with Optional and Keyword arguments diff --git a/index.js b/index.js index 0011ef289c..400ac95285 100644 --- a/index.js +++ b/index.js @@ -52,15 +52,11 @@ function RedisClient(stream, options) { this.should_buffer = false; this.command_queue_high_water = this.options.command_queue_high_water || 1000; this.command_queue_low_water = this.options.command_queue_low_water || 0; - if (options.max_attempts && options.max_attempts > 0) { - this.max_attempts = +options.max_attempts; - } + this.max_attempts = +options.max_attempts || 0; this.command_queue = new Queue(); // holds sent commands to de-pipeline them this.offline_queue = new Queue(); // holds commands issued but not able to be sent this.commands_sent = 0; - if (options.connect_timeout && options.connect_timeout > 0) { - this.connect_timeout = +options.connect_timeout; - } + this.connect_timeout = +options.connect_timeout || 86400000; // 24 * 60 * 60 * 1000 ms this.enable_offline_queue = true; if (this.options.enable_offline_queue === false) { this.enable_offline_queue = false; @@ -123,7 +119,7 @@ RedisClient.prototype.install_stream_listeners = function() { RedisClient.prototype.initialize_retry_vars = function () { this.retry_timer = null; this.retry_totaltime = 0; - this.retry_delay = 150; + this.retry_delay = 200; this.retry_backoff = 1.7; this.attempts = 1; }; @@ -141,21 +137,17 @@ RedisClient.prototype.unref = function () { }; // flush offline_queue and command_queue, erroring any items with a callback first -RedisClient.prototype.flush_and_error = function (message) { - var command_obj, error; +RedisClient.prototype.flush_and_error = function (error) { + var command_obj; - error = new Error(message); - - while (this.offline_queue.length > 0) { - command_obj = this.offline_queue.shift(); + while (command_obj = this.offline_queue.shift()) { if (typeof command_obj.callback === "function") { command_obj.callback(error); } } this.offline_queue = new Queue(); - while (this.command_queue.length > 0) { - command_obj = this.command_queue.shift(); + while (command_obj = this.command_queue.shift()) { if (typeof command_obj.callback === "function") { command_obj.callback(error); } @@ -172,8 +164,6 @@ RedisClient.prototype.on_error = function (msg) { debug(message); - this.flush_and_error(message); - this.connected = false; this.ready = false; @@ -399,8 +389,8 @@ RedisClient.prototype.ready_check = function () { RedisClient.prototype.send_offline_queue = function () { var command_obj, buffered_writes = 0; - while (this.offline_queue.length > 0) { - command_obj = this.offline_queue.shift(); + // TODO: Implement queue.pop() as it should be faster than shift and evaluate petka antonovs queue + while (command_obj = this.offline_queue.shift()) { debug("Sending offline command: " + command_obj.command); buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback); } @@ -438,56 +428,54 @@ RedisClient.prototype.connection_gone = function (why) { } // since we are collapsing end and close, users don't expect to be called twice - if (! this.emitted_end) { + if (!this.emitted_end) { this.emit("end"); this.emitted_end = true; } - this.flush_and_error("Redis connection gone from " + why + " event."); - // If this is a requested shutdown, then don't retry if (this.closing) { - this.retry_timer = null; - debug("Connection ended from quit command, not retrying."); + debug("connection ended from quit command, not retrying."); + this.flush_and_error(new Error("Redis connection gone from " + why + " event.")); return; } - var nextDelay = Math.floor(this.retry_delay * this.retry_backoff); - if (this.retry_max_delay !== null && nextDelay > this.retry_max_delay) { + if (this.max_attempts !== 0 && this.attempts >= this.max_attempts || this.retry_totaltime >= this.connect_timeout) { + var message = this.retry_totaltime >= this.connect_timeout ? + 'connection timeout exceeded.' : + 'maximum connection attempts exceeded.'; + var error = new Error("Redis connection in broken state: " + message); + error.code = 'CONNECTION_BROKEN'; + this.flush_and_error(error); + this.emit('error', error); + this.end(); + return; + } + + if (this.retry_max_delay !== null && this.retry_delay > this.retry_max_delay) { this.retry_delay = this.retry_max_delay; - } else { - this.retry_delay = nextDelay; + } else if (this.retry_totaltime + this.retry_delay > this.connect_timeout) { + // Do not exceed the maximum + this.retry_delay = this.connect_timeout - this.retry_totaltime; } debug("Retry connection in " + this.retry_delay + " ms"); - if (this.max_attempts && this.attempts >= this.max_attempts) { - this.retry_timer = null; - // TODO - some people need a "Redis is Broken mode" for future commands that errors immediately, and others - // want the program to exit. Right now, we just log, which doesn't really help in either case. - debug("Couldn't get Redis connection after " + this.max_attempts + " attempts."); - return; - } - - this.attempts += 1; - this.emit("reconnecting", { - delay: self.retry_delay, - attempt: self.attempts - }); this.retry_timer = setTimeout(function () { debug("Retrying connection..."); - self.retry_totaltime += self.retry_delay; + self.emit("reconnecting", { + delay: self.retry_delay, + attempt: self.attempts + }); - if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) { - self.retry_timer = null; - // TODO - engage Redis is Broken mode for future commands, or whatever - debug("Couldn't get Redis connection after " + self.retry_totaltime + "ms."); - return; - } + self.retry_totaltime += self.retry_delay; + self.attempts += 1; + self.retry_delay = Math.round(self.retry_delay * self.retry_backoff); self.stream = net.createConnection(self.connectionOption); self.install_stream_listeners(); + self.retry_timer = null; }, this.retry_delay); }; @@ -836,12 +824,12 @@ RedisClient.prototype.pub_sub_command = function (command_obj) { RedisClient.prototype.end = function () { this.stream._events = {}; - //clear retry_timer - if(this.retry_timer){ + // Clear retry_timer + if (this.retry_timer){ clearTimeout(this.retry_timer); - this.retry_timer=null; + this.retry_timer = null; } - this.stream.on("error", function(){}); + this.stream.on("error", function noop(){}); this.connected = false; this.ready = false; @@ -1047,7 +1035,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = function (callback) { // TODO - make this callback part of Multi.prototype instead of creating it each time return this._client.send_command("exec", [], function (err, replies) { - if (err) { + if (err && !err.code) { if (callback) { errors.push(err); callback(errors); @@ -1083,6 +1071,9 @@ Multi.prototype.exec = Multi.prototype.EXEC = function (callback) { if (callback) { callback(null, replies); + } else if (err && err.code !== 'CONNECTION_BROKEN') { + // Exclude CONNECTION_BROKEN so that error won't be emitted twice + self._client.emit('error', err); } }); }; diff --git a/test/auth.spec.js b/test/auth.spec.js index 6eb96e0f87..d29b605f27 100644 --- a/test/auth.spec.js +++ b/test/auth.spec.js @@ -12,10 +12,11 @@ describe("client authentication", function () { }); }); - helper.allTests(function(parser, ip, args) { + helper.allTests({ + allConnections: true + }, function(parser, ip, args) { describe("using " + parser + " and " + ip, function () { - var args = config.configureClient(parser, ip); var auth = 'porkchopsandwiches'; var client = null; diff --git a/test/commands/hgetall.spec.js b/test/commands/hgetall.spec.js index e45995aa35..958e717c8c 100644 --- a/test/commands/hgetall.spec.js +++ b/test/commands/hgetall.spec.js @@ -13,7 +13,6 @@ describe("The 'hgetall' method", function () { var client; describe('regular client', function () { - var args = config.configureClient(parser, ip); beforeEach(function (done) { client = redis.createClient.apply(redis.createClient, args); diff --git a/test/connection.spec.js b/test/connection.spec.js new file mode 100644 index 0000000000..a59ff8deaf --- /dev/null +++ b/test/connection.spec.js @@ -0,0 +1,84 @@ +'use strict'; + +var assert = require("assert"); +var config = require("./lib/config"); +var helper = require('./helper'); +var redis = config.redis; + +describe("on lost connection", function () { + helper.allTests(function(parser, ip, args) { + + describe("using " + parser + " and " + ip, function () { + + it("emit an error after max retry attempts and do not try to reconnect afterwards", function (done) { + var max_attempts = 4; + var client = redis.createClient({ + parser: parser, + max_attempts: max_attempts + }); + var calls = 0; + + client.once('ready', function() { + helper.killConnection(client); + }); + + client.on("reconnecting", function (params) { + calls++; + }); + + client.on('error', function(err) { + if (/Redis connection in broken state: maximum connection attempts.*?exceeded./.test(err.message)) { + setTimeout(function () { + assert.strictEqual(calls, max_attempts - 1); + done(); + }, 1500); + } + }); + }); + + it("emit an error after max retry timeout and do not try to reconnect afterwards", function (done) { + var connect_timeout = 1000; // in ms + var client = redis.createClient({ + parser: parser, + connect_timeout: connect_timeout + }); + var time = 0; + + client.once('ready', function() { + helper.killConnection(client); + }); + + client.on("reconnecting", function (params) { + time += params.delay; + }); + + client.on('error', function(err) { + if (/Redis connection in broken state: connection timeout.*?exceeded./.test(err.message)) { + setTimeout(function () { + assert(time === connect_timeout); + done(); + }, 1500); + } + }); + }); + + it("end connection while retry is still ongoing", function (done) { + var connect_timeout = 1000; // in ms + var client = redis.createClient({ + parser: parser, + connect_timeout: connect_timeout + }); + + client.once('ready', function() { + helper.killConnection(client); + }); + + client.on("reconnecting", function (params) { + client.end(); + setTimeout(done, 100); + }); + }); + + }); + }); +}); diff --git a/test/helper.js b/test/helper.js index 5f4d7b4624..b57f31b3a1 100644 --- a/test/helper.js +++ b/test/helper.js @@ -108,22 +108,27 @@ module.exports = { } return true; }, - allTests: function (cb) { - [undefined].forEach(function (options) { // add buffer option at some point - describe(options && options.return_buffers ? "returning buffers" : "returning strings", function () { - var parsers = ['javascript']; - var protocols = ['IPv4']; - if (process.platform !== 'win32') { - parsers.push('hiredis'); - protocols.push('IPv6'); + allTests: function (options, cb) { + if (!cb) { + cb = options; + options = {}; + } + // TODO: Test all different option cases at some point (e.g. buffers) + // [undefined, { return_buffers: true }].forEach(function (config_options) { + // describe(config_options && config_options.return_buffers ? "returning buffers" : "returning strings", function () { + // }); + // }); + var parsers = ['javascript']; + var protocols = ['IPv4']; + if (process.platform !== 'win32') { + parsers.push('hiredis'); + protocols.push('IPv6', '/tmp/redis.sock'); + } + parsers.forEach(function (parser) { + protocols.forEach(function (ip, i) { + if (i === 0 || options.allConnections) { + cb(parser, ip, config.configureClient(parser, ip)); } - - parsers.forEach(function (parser) { - if (process.platform !== 'win32') cb(parser, "/tmp/redis.sock", config.configureClient(parser, "/tmp/redis.sock", options)); - protocols.forEach(function (ip) { - cb(parser, ip, config.configureClient(parser, ip, options)); - }); - }); }); }); }, @@ -140,5 +145,15 @@ module.exports = { func(); } }; + }, + killConnection: function (client) { + // Change the connection option to a non existing one and destroy the stream + client.connectionOption = { + port: 6370, + host: '127.0.0.2', + family: 4 + }; + client.address = '127.0.0.2:6370'; + client.stream.destroy(); } }; diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index bad7439687..14d9913b17 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -8,7 +8,9 @@ var redis = config.redis; describe("The node_redis client", function () { - helper.allTests(function(parser, ip, args) { + helper.allTests({ + allConnections: true + }, function(parser, ip, args) { if (args[2]) { // skip if options are undefined describe("testing parser existence", function () { @@ -624,7 +626,6 @@ describe("The node_redis client", function () { describe('defaults to true', function () { var client; - var args = config.configureClient(parser, ip); it("fires client.on('ready')", function (done) { client = redis.createClient.apply(redis.createClient, args); @@ -703,19 +704,49 @@ describe("The node_redis client", function () { if (err) return done(err); }); - return setTimeout(function(){ + return setTimeout(function() { assert.strictEqual(client.offline_queue.length, 1); return done(); }, 25); }, 50); }); + + it("enqueues operation and keep the queue while trying to reconnect", function (done) { + var client = redis.createClient(9999, null, { + max_attempts: 4, + parser: parser + }); + var i = 0; + + client.on('error', function(err) { + if (err.message === 'Redis connection in broken state: maximum connection attempts exceeded.') { + assert(i, 3); + assert.strictEqual(client.offline_queue.length, 0); + done(); + } + }); + + client.on('reconnecting', function(params) { + i++; + assert.equal(params.attempt, i); + assert.strictEqual(client.offline_queue.length, 2); + }); + + // Should work with either a callback or without + client.set('baz', 13); + client.set('foo', 'bar', function(err, result) { + assert(i, 3); + assert('Redis connection gone from error event', err.message); + assert.strictEqual(client.offline_queue.length, 0); + }); + }); }); describe('false', function () { it("does not emit an error and enqueues operation", function (done) { var client = redis.createClient(9999, null, { parser: parser, - max_attempts: 1, + max_attempts: 0, enable_offline_queue: false }); @@ -735,6 +766,40 @@ describe("The node_redis client", function () { }); }); }); + + it("flushes the command queue connection if in broken connection mode", function (done) { + var client = redis.createClient({ + parser: parser, + max_attempts: 2, + enable_offline_queue: false + }); + + client.once('ready', function() { + var multi = client.multi(); + multi.config("bar"); + var cb = function(err, reply) { + assert.equal(err.code, 'CONNECTION_BROKEN'); + }; + for (var i = 0; i < 10; i += 2) { + multi.set("foo" + i, "bar" + i); + multi.set("foo" + (i + 1), "bar" + (i + 1), cb); + } + multi.exec(); + assert.equal(client.command_queue.length, 13); + helper.killConnection(client); + }); + + client.on("reconnecting", function (params) { + assert.equal(client.command_queue.length, 13); + }); + + client.on('error', function(err) { + if (/Redis connection in broken state:/.test(err.message)) { + assert.equal(client.command_queue.length, 0); + done(); + } + }); + }); }); });