From c74657cdfadcc984f851b1886d333443e957c513 Mon Sep 17 00:00:00 2001 From: Matt Ranney Date: Thu, 10 Nov 2011 12:58:51 -1000 Subject: [PATCH] Improved reconnect logic. Initial version of reconnect thresholds. --- index.js | 215 +++++++++++++++++++++------------------- test.js | 4 +- tests/reconnect_test.js | 9 +- 3 files changed, 123 insertions(+), 105 deletions(-) diff --git a/index.js b/index.js index 8f7ff59efe..35660fef29 100644 --- a/index.js +++ b/index.js @@ -67,36 +67,7 @@ function RedisClient(stream, options) { }); this.stream.on("error", function (msg) { - if (self.closing) { - return; - } - - var message = "Redis connection to " + self.host + ":" + self.port + " failed - " + msg.message; - - if (exports.debug_mode) { - console.warn(message); - } - self.offline_queue.forEach(function (args) { - if (typeof args[2] === "function") { - args[2](message); - } - }); - self.offline_queue = new Queue(); - - self.command_queue.forEach(function (args) { - if (typeof args[2] === "function") { - args[2](message); - } - }); - self.command_queue = new Queue(); - - self.connected = false; - self.ready = false; - - self.emit("error", new Error(message)); - // "error" events get turned into exceptions if they aren't listened for. If the user handled this error - // then we should try to reconnect. - self.connection_gone("error"); + self.on_error(msg.message); }); this.stream.on("close", function () { @@ -117,6 +88,46 @@ function RedisClient(stream, options) { util.inherits(RedisClient, events.EventEmitter); exports.RedisClient = RedisClient; +RedisClient.prototype.on_error = function (msg) { + var message = "Redis connection to " + this.host + ":" + this.port + " failed - " + msg, + self = this, command_obj; + + if (this.closing) { + return; + } + + if (exports.debug_mode) { + console.warn(message); + } + + // send errors to any commands in the offline queue, then reset + while (this.offline_queue.length > 0) { + command_obj = this.offline_queue.shift(); + console.dir(command_obj); + if (typeof command_obj.callback === "function") { + command_obj.callback(message); + } + } + this.offline_queue = new Queue(); + + // send errors to any commands in the command queue, then reset + while (this.command_queue.length > 0) { + command_obj = this.command_queue.shift(); + if (typeof command_obj.callback === "function") { + command_obj.callback(message); + } + } + this.command_queue = new Queue(); + + this.connected = false; + this.ready = false; + + this.emit("error", new Error(message)); + // "error" events get turned into exceptions if they aren't listened for. If the user handled this error + // then we should try to reconnect. + this.connection_gone("error"); +}; + RedisClient.prototype.do_auth = function () { var self = this; @@ -175,7 +186,7 @@ RedisClient.prototype.on_connect = function () { this.max_attempts = 0; this.retry_totaltime = 0; this.retry_timer = null; - this.current_retry_delay = this.retry_time; + this.current_retry_delay = this.retry_delay; this.stream.setNoDelay(); this.stream.setTimeout(0); @@ -235,60 +246,64 @@ RedisClient.prototype.init_parser = function () { }); }; +RedisClient.prototype.on_info_cmd = function (err, res) { + var self = this, obj = {}, lines, retry_time; + + if (err) { + return self.emit("error", "Ready check failed: " + err); + } + + lines = res.toString().split("\r\n"); + + lines.forEach(function (line) { + var parts = line.split(':'); + if (parts[1]) { + obj[parts[0]] = parts[1]; + } + }); + + obj.versions = []; + obj.redis_version.split('.').forEach(function (num) { + obj.versions.push(+num); + }); + + // expose info key/vals to users + this.server_info = obj; + + if (!obj.loading || (obj.loading && obj.loading === "0")) { + if (exports.debug_mode) { + console.log("Redis server ready."); + } + this.ready = true; + + this.send_offline_queue(); + this.emit("ready"); + } else { + retry_time = obj.loading_eta_seconds * 1000; + if (retry_time > 1000) { + retry_time = 1000; + } + if (exports.debug_mode) { + console.log("Redis server still loading, trying again in " + retry_time); + } + setTimeout(function () { + self.ready_check(); + }, retry_time); + } +}; + RedisClient.prototype.ready_check = function () { var self = this; - function send_info_cmd() { - if (exports.debug_mode) { - console.log("checking server ready state..."); - } - - self.send_anyway = true; // secret flag to send_command to send something even if not "ready" - self.info(function (err, res) { - if (err) { - return self.emit("error", "Ready check failed: " + err); - } - - var lines = res.toString().split("\r\n"), obj = {}, retry_time; - - lines.forEach(function (line) { - var parts = line.split(':'); - if (parts[1]) { - obj[parts[0]] = parts[1]; - } - }); - - obj.versions = []; - obj.redis_version.split('.').forEach(function (num) { - obj.versions.push(+num); - }); - - // expose info key/vals to users - self.server_info = obj; - - if (!obj.loading || (obj.loading && obj.loading === "0")) { - if (exports.debug_mode) { - console.log("Redis server ready."); - } - self.ready = true; - - self.send_offline_queue(); - self.emit("ready"); - } else { - retry_time = obj.loading_eta_seconds * 1000; - if (retry_time > 1000) { - retry_time = 1000; - } - if (exports.debug_mode) { - console.log("Redis server still loading, trying again in " + retry_time); - } - setTimeout(send_info_cmd, retry_time); - } - }); - self.send_anyway = false; + if (exports.debug_mode) { + console.log("checking server ready state..."); } - send_info_cmd(); + this.send_anyway = true; // secret flag to send_command to send something even if not "ready" + this.info(function (err, res) { + self.on_info_cmd(err, res); + }); + this.send_anyway = false; }; RedisClient.prototype.send_offline_queue = function () { @@ -310,7 +325,7 @@ RedisClient.prototype.send_offline_queue = function () { }; RedisClient.prototype.connection_gone = function (why) { - var self = this; + var self = this, message; // If a retry is already in progress, just let that happen if (this.retry_timer) { @@ -335,8 +350,8 @@ RedisClient.prototype.connection_gone = function (why) { } this.command_queue.forEach(function (args) { - if (typeof args[2] === "function") { - args[2]("Server connection closed"); + if (typeof args.callback === "function") { + args.callback("Server connection closed"); } }); this.command_queue = new Queue(); @@ -347,17 +362,16 @@ RedisClient.prototype.connection_gone = function (why) { return; } - this.current_retry_delay = this.retry_delay * this.retry_backoff; + this.current_retry_delay = this.current_retry_delay * this.retry_backoff; if (exports.debug_mode) { console.log("Retry connection in " + this.current_retry_delay + " ms"); } - - if (self.max_attempts && self.attempts >= self.max_attempts) { - self.retry_timer = null; - if (exports.debug_mode) { - console.log("Aborting connection attempt: Max attempts " + self.max_attempts + " failed."); - } + + if (this.max_attempts && this.attempts >= this.max_attempts) { + this.retry_timer = null; + // TODO - engage Redis is Broken mode for future commands + this.emit("not_reconnecting", "Aborting connection attempt: Max attempts " + this.max_attempts + " failed."); return; } @@ -371,14 +385,12 @@ RedisClient.prototype.connection_gone = function (why) { console.log("Retrying connection..."); } - self.retry_delay = self.retry_delay * self.retry_backoff; - self.retry_totaltime += self.retry_delay; - + self.retry_totaltime += self.current_retry_delay; + if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) { self.retry_timer = null; - if (exports.debug_mode) { - console.log("Aborting connection attempt: Total timeout of " + self.connect_timeout + "ms exceeded."); - } + // TODO - engage Redis is Broken mode for future commands + self.emit("not_reconnecting", "Aborting connection attempt: Total timeout of " + self.connect_timeout + "ms exceeded."); return; } @@ -435,7 +447,7 @@ RedisClient.prototype.return_error = function (err) { RedisClient.prototype.return_reply = function (reply) { var command_obj = this.command_queue.shift(), - obj, i, len, key, val, type, timestamp, args, queue_len = this.command_queue.getLength(); + obj, i, len, key, val, type, timestamp, argindex, args, queue_len = this.command_queue.getLength(); if (this.subscriptions === false && queue_len === 0) { this.emit("idle"); @@ -495,9 +507,9 @@ RedisClient.prototype.return_reply = function (reply) { } else if (this.monitoring) { len = reply.indexOf(" "); timestamp = reply.slice(0, len); - // TODO - this de-quoting doesn't work correctly if you put JSON strings in your values. - args = reply.slice(len + 1).match(/"[^"]+"/g).map(function (elem) { - return elem.replace(/"/g, ""); + argindex = reply.indexOf('"'); + args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) { + return elem.replace(/\\"/g, '"'); }); this.emit("monitor", timestamp, args); } else { @@ -505,7 +517,8 @@ RedisClient.prototype.return_reply = function (reply) { } }; -// This Command constructor is ever so slightly faster than using an object literal +// This Command constructor is ever so slightly faster than using an object literal, but more importantly, using +// a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots. function Command(command, args, sub_command, callback) { this.command = command; this.args = args; diff --git a/test.js b/test.js index d909faebd0..7ab96373b3 100644 --- a/test.js +++ b/test.js @@ -348,7 +348,7 @@ tests.reconnect = function () { // For orderly shutdown in normal programs, do client.quit() client.stream.destroy(); }); - + client.on("reconnecting", function on_recon(params) { client.on("connect", function on_connect() { client.select(test_db_num, require_string("OK", name)); @@ -1233,7 +1233,7 @@ client3.on("error", function (err) { }); client.on("reconnecting", function (params) { -// console.log("reconnecting: " + util.inspect(params)); + console.log("reconnecting: " + util.inspect(params)); }); process.on('uncaughtException', function (err) { diff --git a/tests/reconnect_test.js b/tests/reconnect_test.js index 08a6ca685d..df5b4b256d 100644 --- a/tests/reconnect_test.js +++ b/tests/reconnect_test.js @@ -1,4 +1,6 @@ -var redis = require("redis").createClient(); +var redis = require("../index").createClient(null, null, { + max_attempts: 2 +}); redis.on("error", function (err) { console.log("Redis says: " + err); @@ -11,6 +13,9 @@ redis.on("ready", function () { redis.on("reconnecting", function (arg) { console.log("Redis reconnecting: " + JSON.stringify(arg)); }); +redis.on("not_reconnecting", function (arg) { + console.log("Redis NOT reconnecting: " + arg); +}); redis.on("connect", function () { console.log("Redis connected."); }); @@ -24,4 +29,4 @@ setInterval(function () { console.log(now + " Redis reply: " + res); } }); -}, 200); +}, 100);