diff --git a/index.js b/index.js index 672127e16c..b6038b3882 100644 --- a/index.js +++ b/index.js @@ -17,6 +17,7 @@ function RedisReplyParser() { } sys.inherits(RedisReplyParser, events.EventEmitter); +// Buffer.toString() is quite slow for small strings function small_toString(buf) { var tmp = "", i = 0, end = buf.end; @@ -280,7 +281,8 @@ function RedisClient(stream) { this.connected = false; this.connections = 0; this.attempts = 1; - this.command_queue = new Queue; + this.command_queue = new Queue(); // holds sent commands to de-pipeline them + this.offline_queue = new Queue(); // holds commands issued but not able to be sent this.commands_sent = 0; this.retry_delay = 250; this.retry_backoff = 1.7; @@ -307,6 +309,15 @@ function RedisClient(stream) { self.stream.setNoDelay(); self.stream.setTimeout(0); + var command_obj; + while (self.offline_queue.length > 0) { + command_obj = self.offline_queue.shift(); + if (exports.debug_mode) { + console.log("Sending offline command: " + command_obj.command); + } + self.send_command(command_obj.command, command_obj.args, command_obj.callback); + } + self.emit("connect"); }); @@ -318,6 +329,12 @@ function RedisClient(stream) { if (exports.debug_mode) { console.warn("Connecting to redis server: " + msg); } + self.offline_queue.forEach(function (args) { + if (typeof args[2] === "function") { + args[2]("Server connection could not be established"); + } + }); + self.connected = false; self.emit("error", msg); }); @@ -404,7 +421,7 @@ RedisClient.prototype.return_reply = function (reply_buffer) { }; RedisClient.prototype.send_command = function () { - var command, callback, args, this_args; + var command, callback, args, this_args, command_obj; this_args = Array.prototype.slice.call(arguments); // convert arguments into real array @@ -427,17 +444,21 @@ RedisClient.prototype.send_command = function () { throw new Error("First argument of send_command must be the command name"); } - if (! this.connected) { - // TODO - queue this command and send it once we are connected. - callback(new Error("Redis client is not connected")); - return; - } - - this.command_queue.push({ + command_obj = { command: command, args: args, callback: callback - }); + }; + + if (! this.connected) { + if (exports.debug_mode) { + console.log("Queueing " + command + " for next server connection."); + } + this.offline_queue.push(command_obj); + return; + } + + this.command_queue.push(command_obj); this.commands_sent += 1; var elem_count = 1, stream = this.stream, buffer_args = false, command_str = ""; @@ -461,6 +482,7 @@ RedisClient.prototype.send_command = function () { if (exports.debug_mode) { console.log("send command: " + command_str); } + // Need to catch "Stream is not writable" exception here and error everybody in the command queue out stream.write(command_str); } else { if (exports.debug_mode) {