diff --git a/index.js b/index.js index 35660fef29..63bf7f2880 100644 --- a/index.js +++ b/index.js @@ -6,6 +6,7 @@ var net = require("net"), to_array = require("./lib/to_array"), events = require("events"), parsers = [], commands, + connection_id = 0, default_port = 6379, default_host = "127.0.0.1"; @@ -18,7 +19,7 @@ try { parsers.push(require("./lib/parser/hiredis")); } catch (err) { if (exports.debug_mode) { - console.log("hiredis parser not installed."); + console.warn("hiredis parser not installed."); } } @@ -28,6 +29,7 @@ function RedisClient(stream, options) { this.stream = stream; this.options = options = options || {}; + this.connection_id = ++connection_id; this.connected = false; this.ready = false; this.connections = 0; @@ -88,6 +90,26 @@ function RedisClient(stream, options) { util.inherits(RedisClient, events.EventEmitter); exports.RedisClient = RedisClient; +// flush offline_queue and command_queue, erroring any items with a callback first +RedisClient.prototype.flush_and_error = function (message) { + var command_obj; + while (this.offline_queue.length > 0) { + command_obj = this.offline_queue.shift(); + if (typeof command_obj.callback === "function") { + command_obj.callback(message); + } + } + this.offline_queue = new Queue(); + + while (this.command_queue.length > 0) { + command_obj = this.command_queue.shift(); + if (typeof command_obj.callback === "function") { + command_obj.callback(message); + } + } + this.command_queue = new Queue(); +}; + RedisClient.prototype.on_error = function (msg) { var message = "Redis connection to " + this.host + ":" + this.port + " failed - " + msg, self = this, command_obj; @@ -100,24 +122,7 @@ RedisClient.prototype.on_error = function (msg) { console.warn(message); } - // send errors to any commands in the offline queue, then reset - while (this.offline_queue.length > 0) { - command_obj = this.offline_queue.shift(); - console.dir(command_obj); - if (typeof command_obj.callback === "function") { - command_obj.callback(message); - } - } - this.offline_queue = new Queue(); - - // send errors to any commands in the command queue, then reset - while (this.command_queue.length > 0) { - command_obj = this.command_queue.shift(); - if (typeof command_obj.callback === "function") { - command_obj.callback(message); - } - } - this.command_queue = new Queue(); + this.flush_and_error(message); this.connected = false; this.ready = false; @@ -132,7 +137,7 @@ RedisClient.prototype.do_auth = function () { var self = this; if (exports.debug_mode) { - console.log("Sending auth to " + self.host + ":" + self.port + " fd " + self.stream.fd); + console.log("Sending auth to " + self.host + ":" + self.port + " id " + self.connection_id); } self.send_anyway = true; self.send_command("auth", [this.auth_pass], function (err, res) { @@ -152,7 +157,7 @@ RedisClient.prototype.do_auth = function () { return self.emit("error", "Auth failed: " + res.toString()); } if (exports.debug_mode) { - console.log("Auth succeeded " + self.host + ":" + self.port + " fd " + self.stream.fd); + console.log("Auth succeeded " + self.host + ":" + self.port + " id " + self.connection_id); } if (self.auth_callback) { self.auth_callback(err, res); @@ -173,7 +178,7 @@ RedisClient.prototype.do_auth = function () { RedisClient.prototype.on_connect = function () { if (exports.debug_mode) { - console.log("Stream connected " + this.host + ":" + this.port + " fd " + this.stream.fd); + console.log("Stream connected " + this.host + ":" + this.port + " id " + this.connection_id); } var self = this; @@ -332,9 +337,6 @@ RedisClient.prototype.connection_gone = function (why) { return; } - // Note that this may trigger another "close" or "end" event - this.stream.destroy(); - if (exports.debug_mode) { console.warn("Redis connection is gone from " + why + " event."); } @@ -349,16 +351,14 @@ RedisClient.prototype.connection_gone = function (why) { this.emitted_end = true; } - this.command_queue.forEach(function (args) { - if (typeof args.callback === "function") { - args.callback("Server connection closed"); - } - }); - this.command_queue = new Queue(); + this.flush_and_error("Redis connection gone from " + why + " event."); // If this is a requested shutdown, then don't retry if (this.closing) { this.retry_timer = null; + if (exports.debug_mode) { + console.warn("connection ended from quit command, not retrying."); + } return; } @@ -401,7 +401,7 @@ RedisClient.prototype.connection_gone = function (why) { RedisClient.prototype.on_data = function (data) { if (exports.debug_mode) { - console.log("net read " + this.host + ":" + this.port + " fd " + this.stream.fd + ": " + data.toString()); + console.log("net read " + this.host + ":" + this.port + " id " + this.connection_id + ": " + data.toString()); } try { @@ -621,7 +621,7 @@ RedisClient.prototype.send_command = function (command, args, callback) { command_str += "$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n"; } if (exports.debug_mode) { - console.log("send " + this.host + ":" + this.port + " fd " + this.stream.fd + ": " + command_str); + console.log("send " + this.host + ":" + this.port + " id " + this.connection_id + ": " + command_str); } buffered_writes += !stream.write(command_str); } else {