diff --git a/redis.js b/redis.js index 0c6a2cd525..67b9ab5c76 100644 --- a/redis.js +++ b/redis.js @@ -21,6 +21,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) { var pos = 0; while (pos < incoming_buf.length) { +// console.log("execute " + this.state + " " + pos + " " + String.fromCharCode(incoming_buf[pos])); switch (this.state) { case "type": this.type = incoming_buf[pos]; @@ -53,7 +54,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) { break; case "integer line": if (incoming_buf[pos] === 13) { - this.emit("integer reply", this.return_buffer.slice(0, this.return_buffer.end)); + this.send_reply(parseInt(this.return_buffer.slice(0, this.return_buffer.end)),10); this.state = "final lf"; } else { this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; @@ -64,7 +65,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) { break; case "error line": if (incoming_buf[pos] === 13) { - this.emit("error reply", new Error(this.return_buffer.slice(0, this.return_buffer.end))); + this.send_error(this.return_buffer.toString("utf8", 0, this.return_buffer.end)); this.state = "final lf"; } else { this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; @@ -74,7 +75,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) { break; case "single line": if (incoming_buf[pos] === 13) { - this.emit("single line reply", this.return_buffer.slice(0, this.return_buffer.end)); + this.send_reply(this.return_buffer.toString("utf8", 0, this.return_buffer.end)); this.state = "final lf"; } else { this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; @@ -95,7 +96,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) { case "multi bulk count lf": if (incoming_buf[pos] === 10) { // \n this.multi_bulk_length = parseInt(this.tmp_buffer.toString("utf8", 0, this.tmp_buffer.end), 10); - this.multi_bulk_responses = []; + this.multi_bulk_replies = []; this.state = "type"; } else { this.emit("error", new Error("didn't see LF after NL reading multi bulk count")); @@ -117,11 +118,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) { if (incoming_buf[pos] === 10) { // \n this.bulk_length = parseInt(this.tmp_buffer.toString("utf8", 0, this.tmp_buffer.end), 10); if (this.bulk_length === -1) { - if (this.multi_bulk_length > 0) { - this.add_multi_bulk_response(null); - } else { - this.emit("null reply"); - } + this.send_reply(null); this.state = "type"; } else { this.state = "bulk data"; @@ -143,13 +140,10 @@ RedisReplyParser.prototype.execute = function (incoming_buf) { this.return_buffer.end += 1; pos += 1; if (this.return_buffer.end === this.bulk_length) { - if (this.multi_bulk_length > 0) { - var mb_tmp = new Buffer(this.bulk_length); - this.return_buffer.copy(mb_tmp, 0, 0, this.bulk_length); - this.add_multi_bulk_response(mb_tmp); - } else { - this.emit("bulk reply", this.return_buffer.slice(0, this.bulk_length)); - } + // this ugilness could go away if we gave the user a volatile buffer, but that seems dangerous + var bd_tmp = new Buffer(this.bulk_length); + this.return_buffer.copy(bd_tmp, 0, 0, this.bulk_length); + this.send_reply(bd_tmp); this.state = "final cr"; } break; @@ -179,12 +173,29 @@ RedisReplyParser.prototype.execute = function (incoming_buf) { } }; -RedisReplyParser.prototype.add_multi_bulk_response = function (response) { - this.multi_bulk_responses.push(response); - if (this.multi_bulk_responses.length === this.multi_bulk_length) { - this.emit("multibulk reply", this.multi_bulk_responses); +RedisReplyParser.prototype.send_error = function (reply) { + if (this.multi_bulk_length > 0) { + // TODO - can this happen? Seems like maybe not. + this.add_multi_bulk_reply(reply); + } else { + this.emit("reply error", reply); + } +} + +RedisReplyParser.prototype.send_reply = function (reply) { + if (this.multi_bulk_length > 0) { + this.add_multi_bulk_reply(reply); + } else { + this.emit("reply", reply); + } +}; + +RedisReplyParser.prototype.add_multi_bulk_reply = function (reply) { + this.multi_bulk_replies.push(reply); + if (this.multi_bulk_replies.length === this.multi_bulk_length) { + this.emit("reply", this.multi_bulk_replies); this.multi_bulk_length = 0; - this.multi_bulk_responses = null; + this.multi_bulk_replies = null; } }; @@ -194,69 +205,99 @@ function RedisClient(stream) { this.stream = stream; this.connected = false; this.connections = 0; - this.commands_sent = 0; - this.commands_in_flight = 0; - this.replies_received = 0; + this.attempts = 1; this.command_queue = []; + this.commands_sent = 0; + this.retry_delay = 250; + this.retry_backoff = 1.7; var self = this; - stream.on("connect", function () { - self.on_connect(); + this.stream.on("connect", function () { + self.connected = true; + self.connections += 1; + self.command_queue = []; + + self.reply_parser = new RedisReplyParser(); + self.reply_parser.on("reply error", function (reply) { + self.return_error(reply); + }); + self.reply_parser.on("reply", function (reply) { + self.return_reply(reply); + }); + self.reply_parser.on("error", function (err) { + console.log("Redis reply parser error: " + err.stack); + }); + + self.retry_delay = 250; + self.stream.setNoDelay(); + self.stream.setTimeout(0); + + self.emit("connect"); }); - stream.on("data", function (buffer_from_socket) { + this.stream.on("data", function (buffer_from_socket) { self.on_data(buffer_from_socket); }); - stream.on("error", function () { - console.log("Error connecting to redis server."); + this.stream.on("error", function (msg) { + if (exports.debug_mode) { + console.warn("Connecting to redis server: " + msg); + } + self.connected = false; + self.emit("error", msg); }); - stream.on("close", function () { - console.log("Close on redis connection."); + + this.stream.on("close", function () { + self.connection_gone(); }); - stream.on("end", function () { - console.log("End on redis connection."); + + this.stream.on("end", function () { + self.connection_gone(); }); events.EventEmitter.call(this); } sys.inherits(RedisClient, events.EventEmitter); -RedisClient.prototype.on_connect = function () { - console.log("Got connection."); - - this.connected = true; - this.connections += 1; - - this.reply_parser = new RedisReplyParser(); +RedisClient.prototype.connection_gone = function () { var self = this; - this.reply_parser.on("error reply", function (err) { - self.return_error(err); + + if (self.retry_timer) { + return; + } + + if (exports.debug_mode) { + console.warn("Redis connection is gone."); + } + self.connected = false; + self.emit("close"); + self.command_queue.forEach(function (args) { + if (typeof args[2] === "function") { + args[2]("Server connection closed"); + } }); - this.reply_parser.on("null reply", function () { - self.return_reply(null); - }); - this.reply_parser.on("integer reply", function (response_buffer) { - self.return_reply(parseInt(response_buffer.toString(), 10)); - }); - this.reply_parser.on("bulk reply", function (response_buffer) { - self.return_reply(response_buffer); - }); - this.reply_parser.on("multibulk reply", function (response_list) { - self.return_reply(response_list); - }); - this.reply_parser.on("single line reply", function (response_buffer) { - self.return_reply(response_buffer.toString()); - }); - this.reply_parser.on("error", function (err) { - console.log("Redis parser had an error: " + err.stack); - }); - this.emit("connect"); -}; + if (exports.debug_mode) { + console.log("Retry conneciton in " + self.retry_delay + " ms"); + } + self.attempts += 1; + self.emit("reconnecting", "delay " + self.retry_delay + ", attempt " + self.attempts); + self.retry_timer = setTimeout(function () { + if (exports.debug_mode) { + console.log("Retrying conneciton..."); + } + self.retry_timer = null; + self.retry_delay = self.retry_delay * self.retry_backoff; + self.stream.destroy(); + self.stream.connect(self.port, self.host); + }, self.retry_delay); +} RedisClient.prototype.on_data = function (data) { - console.log("on_data: " + data.toString()); + if (exports.debug_mode) { + console.log("on_data: " + data.toString()); + } + try { this.reply_parser.execute(data); } catch (err) { @@ -267,14 +308,25 @@ RedisClient.prototype.on_data = function (data) { RedisClient.prototype.return_error = function (err) { var command_obj = this.command_queue.shift(); - console.log("Error on " + command_obj.command + " " + command_obj.args + ": " + err); - command_obj.callback(err); + if (command_obj && typeof command_obj.callback === "function") { + command_obj.callback(err); + } else { + console.log("no callback to send error: " + err.stack); + // this will probably not make it anywhere useful, but we might as well try + throw err; + } } -RedisClient.prototype.return_reply = function (response_buffer) { +RedisClient.prototype.return_reply = function (reply_buffer) { var command_obj = this.command_queue.shift(); - command_obj.callback(null, response_buffer); + if (command_obj && typeof command_obj.callback === "function") { + command_obj.callback(null, reply_buffer); + } else { + if (this.debug_mode) { + console.log("no callback for reply: " + reply_buffer.toString()); + } + } }; RedisClient.prototype.send_command = function (command, args, callback) { @@ -288,8 +340,8 @@ RedisClient.prototype.send_command = function (command, args, callback) { return; } - if (typeof callback !== "function") { - throw new Error("Third argument of send_command must a results callback function"); + if (callback !== undefined && typeof callback !== "function") { + throw new Error("Third argument of send_command must a results callback function, or omitted"); return; } @@ -303,6 +355,7 @@ RedisClient.prototype.send_command = function (command, args, callback) { args: args, callback: callback }); + this.commands_sent += 1; var elem_count = 1, stream = this.stream, buffer_args = false, command_str = ""; @@ -311,7 +364,7 @@ RedisClient.prototype.send_command = function (command, args, callback) { return arg instanceof Buffer; }); - // Always use "Multi bulk commands", but if passed Buffer args, then do multiple writes for the args + // Always use "Multi bulk commands", but if passed any Buffer args, then do multiple writes, one for each arg command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n"; @@ -322,15 +375,16 @@ RedisClient.prototype.send_command = function (command, args, callback) { } command_str += "$" + arg.length + "\r\n" + arg + "\r\n"; }); -// console.log("non-buffer full command: " + command_str); - if (stream.write(command_str) === false) { - console.log("Buffered write 0"); + if (exports.debug_mode) { + console.log("send command: " + command_str); } + stream.write(command_str); } else { - console.log("buffer command str: " + command_str); - if (stream.write(command_str) === false) { - console.log("Buffered write 1"); + if (exports.debug_mode) { + console.log("send command: " + command_str); + console.log("send command has Buffer arguments"); } + stream.write(command_str); args.forEach(function (arg) { if (arg.length === undefined) { @@ -367,10 +421,6 @@ exports.commands = [ "HSET", "HGET", "HMGET", "HMSET", "HINCRBY", "HEXISTS", "HDEL", "HLEN", "HKEYS", "HVALS", "HGETALL", // Sorting "SORT", - // Transactions - "MULTI", "EXEC", "DISCARD", "WATCH", "UNWATCH", - // Publish/Subscribe - "SUBSCRIBE", "UNSUBSCRIBE", "PUBLISH", // Persistence control commands "SAVE", "BGSAVE", "LASTSAVE", "SHUTDOWN", "BGREWRITEAOF", // Remote server control commands @@ -386,6 +436,48 @@ exports.commands.forEach(function (command) { }; }); +// Transactions +// "MULTI", "EXEC", "DISCARD", "WATCH", "UNWATCH", +// Publish/Subscribe +// "SUBSCRIBE", "UNSUBSCRIBE", "PUBLISH", + +RedisClient.prototype.multi = function (commands) { + var self = this; + + try { + this.send_command("MULTI", [], function (err, reply) { + if (err) { + console.warn("Error starting MULTI request: " + err.stack); + } + }); + commands.forEach(function (args, command_num) { + self.send_command(args[0], args[1], function (err, reply) { + if (err) { + args[2](err); + commands.splice(command_num, 1); // what if this runs before all commands are sent? + } else { + if (reply !== "QUEUED") { + console.warn("Unexpected MULTI reply: " + reply + " instead of 'QUEUED'"); + } + } + }); + }); + this.send_command("EXEC", [], function (err, replies) { + replies.forEach(function (reply, reply_num) { + if (typeof commands[reply_num][2] === "function") { + commands[reply_num][2](null, reply) + } else { + if (exports.debug_mode) { + console.log("no callback for multi response " + reply_num + ", skipping."); + } + } + }); + }); + } catch (err) { + console.log("Caught exception in MULTI: " + err.stack); + } +}; + exports.createClient = function (port_arg, host_arg, options) { var port = port_arg || default_port, host = host || default_host, @@ -401,3 +493,10 @@ exports.createClient = function (port_arg, host_arg, options) { return red_client; }; +exports.print = function (err, reply) { + if (err) { + console.log("Error: " + err); + } else { + console.log("Reply: " + reply); + } +};