From 1e698e3fce344faa256f2774ccf0a1a86c7f8541 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 30 Nov 2010 22:29:16 +0100 Subject: [PATCH] Move the reply parser to its own file --- index.js | 276 +------------------------------------- lib/parser/javascript.js | 280 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 283 insertions(+), 273 deletions(-) create mode 100644 lib/parser/javascript.js diff --git a/index.js b/index.js index c08b2370b5..02265964b3 100644 --- a/index.js +++ b/index.js @@ -3,29 +3,13 @@ var net = require("net"), util = require("./lib/util").util, events = require("events"), + reply_parser = require("./lib/parser/javascript"), default_port = 6379, default_host = "127.0.0.1"; // can can set this to true to enable for all connections exports.debug_mode = false; -function RedisReplyParser() { - this.reset(); - events.EventEmitter.call(this); -} -util.inherits(RedisReplyParser, events.EventEmitter); - -// Buffer.toString() is quite slow for small strings -function small_toString(buf) { - var tmp = "", i, il; - - for (i = 0, il = buf.end; i < il; i += 1) { - tmp += String.fromCharCode(buf[i]); - } - - return tmp; -} - function to_array(args) { var len = args.length, arr = new Array(len), i; @@ -37,261 +21,6 @@ function to_array(args) { return arr; } -// Reset parser to it's original state. -RedisReplyParser.prototype.reset = function () { - this.state = "type"; - - this.return_buffer = new Buffer(16384); // for holding replies, might grow - this.tmp_buffer = new Buffer(128); // for holding size fields - - this.multi_bulk_length = 0; - this.multi_bulk_replies = null; - this.multi_bulk_nested_length = 0; - this.multi_bulk_nested_replies = null; -}; - -RedisReplyParser.prototype.execute = function (incoming_buf) { - var pos = 0, bd_tmp, bd_str, i, il; - //, state_times = {}, start_execute = new Date(), start_switch, end_switch, old_state; - //start_switch = new Date(); - - while (pos < incoming_buf.length) { - // old_state = this.state; - // console.log("execute: " + this.state + ", " + pos + "/" + incoming_buf.length + ", " + String.fromCharCode(incoming_buf[pos])); - - switch (this.state) { - case "type": - this.type = incoming_buf[pos]; - pos += 1; - - switch (this.type) { - case 43: // + - this.state = "single line"; - this.return_buffer.end = 0; - break; - case 42: // * - this.state = "multi bulk count"; - this.tmp_buffer.end = 0; - break; - case 58: // : - this.state = "integer line"; - this.return_buffer.end = 0; - break; - case 36: // $ - this.state = "bulk length"; - this.tmp_buffer.end = 0; - break; - case 45: // - - this.state = "error line"; - this.return_buffer.end = 0; - break; - default: - this.state = "unknown type"; - } - break; - case "integer line": - if (incoming_buf[pos] === 13) { - this.send_reply(+small_toString(this.return_buffer)); - this.state = "final lf"; - } else { - this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; - this.return_buffer.end += 1; - } - pos += 1; - break; - case "error line": - if (incoming_buf[pos] === 13) { - this.send_error(this.return_buffer.toString("ascii", 0, this.return_buffer.end)); - this.state = "final lf"; - } else { - this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; - this.return_buffer.end += 1; - } - pos += 1; - break; - case "single line": - if (incoming_buf[pos] === 13) { - if (this.return_buffer.end > 10) { - bd_str = this.return_buffer.toString("utf8", 0, this.return_buffer.end); - } else { - bd_str = small_toString(this.return_buffer); - - } - this.send_reply(bd_str); - this.state = "final lf"; - } else { - this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; - this.return_buffer.end += 1; - // TODO - check for return_buffer overflow and then grow, copy, continue, and drink. - } - pos += 1; - break; - case "multi bulk count": - if (incoming_buf[pos] === 13) { // \r - this.state = "multi bulk count lf"; - } else { - this.tmp_buffer[this.tmp_buffer.end] = incoming_buf[pos]; - this.tmp_buffer.end += 1; - } - pos += 1; - break; - case "multi bulk count lf": - if (incoming_buf[pos] === 10) { // \n - if (this.multi_bulk_length) { // nested multi-bulk - this.multi_bulk_nested_length = this.multi_bulk_length; - this.multi_bulk_nested_replies = this.multi_bulk_replies; - } - this.multi_bulk_length = +small_toString(this.tmp_buffer); - this.multi_bulk_replies = []; - this.state = "type"; - if (this.multi_bulk_length < 0) { - this.send_reply(null); - this.multi_bulk_length = 0; - } else if (this.multi_bulk_length === 0) { - this.send_reply([]); - } - } else { - this.emit("error", new Error("didn't see LF after NL reading multi bulk count")); - this.reset(); - return; - } - pos += 1; - break; - case "bulk length": - if (incoming_buf[pos] === 13) { // \r - this.state = "bulk lf"; - } else { - this.tmp_buffer[this.tmp_buffer.end] = incoming_buf[pos]; - this.tmp_buffer.end += 1; - } - pos += 1; - break; - case "bulk lf": - if (incoming_buf[pos] === 10) { // \n - this.bulk_length = +small_toString(this.tmp_buffer); - if (this.bulk_length === -1) { - this.send_reply(null); - this.state = "type"; - } else if (this.bulk_length === 0) { - this.send_reply(new Buffer("")); - this.state = "final cr"; - } else { - this.state = "bulk data"; - if (this.bulk_length > this.return_buffer.length) { - if (exports.debug_mode) { - console.log("Growing return_buffer from " + this.return_buffer.length + " to " + this.bulk_length); - } - this.return_buffer = new Buffer(this.bulk_length); - // home the old one gets cleaned up somehow - } - this.return_buffer.end = 0; - } - } else { - this.emit("error", new Error("didn't see LF after NL while reading bulk length")); - this.reset(); - return; - } - pos += 1; - break; - case "bulk data": - this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; - this.return_buffer.end += 1; - pos += 1; - // TODO - should be faster to use Bufer.copy() here, especially if the response is large. - // However, when the response is small, Buffer.copy() seems a lot slower. Computers are hard. - if (this.return_buffer.end === this.bulk_length) { - bd_tmp = new Buffer(this.bulk_length); - if (this.bulk_length > 10) { - this.return_buffer.copy(bd_tmp, 0, 0, this.bulk_length); - } else { - for (i = 0, il = this.bulk_length; i < il; i += 1) { - bd_tmp[i] = this.return_buffer[i]; - } - } - this.send_reply(bd_tmp); - this.state = "final cr"; - } - break; - case "final cr": - if (incoming_buf[pos] === 13) { // \r - this.state = "final lf"; - pos += 1; - } else { - this.emit("error", new Error("saw " + incoming_buf[pos] + " when expecting final CR")); - this.reset(); - return; - } - break; - case "final lf": - if (incoming_buf[pos] === 10) { // \n - this.state = "type"; - pos += 1; - } else { - this.emit("error", new Error("saw " + incoming_buf[pos] + " when expecting final LF")); - this.reset(); - return; - } - break; - default: - throw new Error("invalid state " + this.state); - } - // end_switch = new Date(); - // if (state_times[old_state] === undefined) { - // state_times[old_state] = 0; - // } - // state_times[old_state] += (end_switch - start_switch); - // start_switch = end_switch; - } - // console.log("execute ran for " + (Date.now() - start_execute) + " ms, on " + incoming_buf.length + " Bytes. "); - // Object.keys(state_times).forEach(function (state) { - // console.log(" " + state + ": " + state_times[state]); - // }); -}; - -RedisReplyParser.prototype.send_error = function (reply) { - if (this.multi_bulk_length > 0 || this.multi_bulk_nested_length > 0) { - // TODO - can this happen? Seems like maybe not. - this.add_multi_bulk_reply(reply); - } else { - this.emit("reply error", reply); - } -}; - -RedisReplyParser.prototype.send_reply = function (reply) { - if (this.multi_bulk_length > 0 || this.multi_bulk_nested_length > 0) { - this.add_multi_bulk_reply(reply); - } else { - this.emit("reply", reply); - } -}; - -RedisReplyParser.prototype.add_multi_bulk_reply = function (reply) { - if (this.multi_bulk_replies) { - this.multi_bulk_replies.push(reply); - // use "less than" here because a nil mb reply claims "0 length", but we need 1 slot to hold it - if (this.multi_bulk_replies.length < this.multi_bulk_length) { - return; - } - } else { - this.multi_bulk_replies = reply; - } - - if (this.multi_bulk_nested_length > 0) { - this.multi_bulk_nested_replies.push(this.multi_bulk_replies); - this.multi_bulk_length = 0; - delete this.multi_bulk_replies; - if (this.multi_bulk_nested_length === this.multi_bulk_nested_replies.length) { - this.emit("reply", this.multi_bulk_nested_replies); - this.multi_bulk_nested_length = 0; - this.multi_bulk_nested_replies = null; - } - } else { - this.emit("reply", this.multi_bulk_replies); - this.multi_bulk_length = 0; - this.multi_bulk_replies = null; - } -}; - // Queue class adapted from Tim Caswell's pattern library // http://github.com/creationix/pattern/blob/master/lib/pattern/queue.js var Queue = function () { @@ -370,7 +99,8 @@ function RedisClient(stream, options) { self.command_queue = new Queue(); self.emitted_end = false; - self.reply_parser = new RedisReplyParser(); + reply_parser.debug_mode = exports.debug_mode; + self.reply_parser = new reply_parser.Parser(); // "reply error" is an error sent back by redis self.reply_parser.on("reply error", function (reply) { self.return_error(reply); diff --git a/lib/parser/javascript.js b/lib/parser/javascript.js new file mode 100644 index 0000000000..22ae5ab6ce --- /dev/null +++ b/lib/parser/javascript.js @@ -0,0 +1,280 @@ +var events = require("events"), + util = require("../util").util; + +function RedisReplyParser() { + this.reset(); + events.EventEmitter.call(this); +} + +util.inherits(RedisReplyParser, events.EventEmitter); + +exports.Parser = RedisReplyParser; +exports.debug_mode = false; +exports.type = "javascript"; + +// Buffer.toString() is quite slow for small strings +function small_toString(buf) { + var tmp = "", i, il; + + for (i = 0, il = buf.end; i < il; i += 1) { + tmp += String.fromCharCode(buf[i]); + } + + return tmp; +} + +// Reset parser to it's original state. +RedisReplyParser.prototype.reset = function () { + this.state = "type"; + + this.return_buffer = new Buffer(16384); // for holding replies, might grow + this.tmp_buffer = new Buffer(128); // for holding size fields + + this.multi_bulk_length = 0; + this.multi_bulk_replies = null; + this.multi_bulk_nested_length = 0; + this.multi_bulk_nested_replies = null; +}; + +RedisReplyParser.prototype.execute = function (incoming_buf) { + var pos = 0, bd_tmp, bd_str, i, il; + //, state_times = {}, start_execute = new Date(), start_switch, end_switch, old_state; + //start_switch = new Date(); + + while (pos < incoming_buf.length) { + // old_state = this.state; + // console.log("execute: " + this.state + ", " + pos + "/" + incoming_buf.length + ", " + String.fromCharCode(incoming_buf[pos])); + + switch (this.state) { + case "type": + this.type = incoming_buf[pos]; + pos += 1; + + switch (this.type) { + case 43: // + + this.state = "single line"; + this.return_buffer.end = 0; + break; + case 42: // * + this.state = "multi bulk count"; + this.tmp_buffer.end = 0; + break; + case 58: // : + this.state = "integer line"; + this.return_buffer.end = 0; + break; + case 36: // $ + this.state = "bulk length"; + this.tmp_buffer.end = 0; + break; + case 45: // - + this.state = "error line"; + this.return_buffer.end = 0; + break; + default: + this.state = "unknown type"; + } + break; + case "integer line": + if (incoming_buf[pos] === 13) { + this.send_reply(+small_toString(this.return_buffer)); + this.state = "final lf"; + } else { + this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; + this.return_buffer.end += 1; + } + pos += 1; + break; + case "error line": + if (incoming_buf[pos] === 13) { + this.send_error(this.return_buffer.toString("ascii", 0, this.return_buffer.end)); + this.state = "final lf"; + } else { + this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; + this.return_buffer.end += 1; + } + pos += 1; + break; + case "single line": + if (incoming_buf[pos] === 13) { + if (this.return_buffer.end > 10) { + bd_str = this.return_buffer.toString("utf8", 0, this.return_buffer.end); + } else { + bd_str = small_toString(this.return_buffer); + + } + this.send_reply(bd_str); + this.state = "final lf"; + } else { + this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; + this.return_buffer.end += 1; + // TODO - check for return_buffer overflow and then grow, copy, continue, and drink. + } + pos += 1; + break; + case "multi bulk count": + if (incoming_buf[pos] === 13) { // \r + this.state = "multi bulk count lf"; + } else { + this.tmp_buffer[this.tmp_buffer.end] = incoming_buf[pos]; + this.tmp_buffer.end += 1; + } + pos += 1; + break; + case "multi bulk count lf": + if (incoming_buf[pos] === 10) { // \n + if (this.multi_bulk_length) { // nested multi-bulk + this.multi_bulk_nested_length = this.multi_bulk_length; + this.multi_bulk_nested_replies = this.multi_bulk_replies; + } + this.multi_bulk_length = +small_toString(this.tmp_buffer); + this.multi_bulk_replies = []; + this.state = "type"; + if (this.multi_bulk_length < 0) { + this.send_reply(null); + this.multi_bulk_length = 0; + } else if (this.multi_bulk_length === 0) { + this.send_reply([]); + } + } else { + this.emit("error", new Error("didn't see LF after NL reading multi bulk count")); + this.reset(); + return; + } + pos += 1; + break; + case "bulk length": + if (incoming_buf[pos] === 13) { // \r + this.state = "bulk lf"; + } else { + this.tmp_buffer[this.tmp_buffer.end] = incoming_buf[pos]; + this.tmp_buffer.end += 1; + } + pos += 1; + break; + case "bulk lf": + if (incoming_buf[pos] === 10) { // \n + this.bulk_length = +small_toString(this.tmp_buffer); + if (this.bulk_length === -1) { + this.send_reply(null); + this.state = "type"; + } else if (this.bulk_length === 0) { + this.send_reply(new Buffer("")); + this.state = "final cr"; + } else { + this.state = "bulk data"; + if (this.bulk_length > this.return_buffer.length) { + if (exports.debug_mode) { + console.log("Growing return_buffer from " + this.return_buffer.length + " to " + this.bulk_length); + } + this.return_buffer = new Buffer(this.bulk_length); + // home the old one gets cleaned up somehow + } + this.return_buffer.end = 0; + } + } else { + this.emit("error", new Error("didn't see LF after NL while reading bulk length")); + this.reset(); + return; + } + pos += 1; + break; + case "bulk data": + this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; + this.return_buffer.end += 1; + pos += 1; + // TODO - should be faster to use Bufer.copy() here, especially if the response is large. + // However, when the response is small, Buffer.copy() seems a lot slower. Computers are hard. + if (this.return_buffer.end === this.bulk_length) { + bd_tmp = new Buffer(this.bulk_length); + if (this.bulk_length > 10) { + this.return_buffer.copy(bd_tmp, 0, 0, this.bulk_length); + } else { + for (i = 0, il = this.bulk_length; i < il; i += 1) { + bd_tmp[i] = this.return_buffer[i]; + } + } + this.send_reply(bd_tmp); + this.state = "final cr"; + } + break; + case "final cr": + if (incoming_buf[pos] === 13) { // \r + this.state = "final lf"; + pos += 1; + } else { + this.emit("error", new Error("saw " + incoming_buf[pos] + " when expecting final CR")); + this.reset(); + return; + } + break; + case "final lf": + if (incoming_buf[pos] === 10) { // \n + this.state = "type"; + pos += 1; + } else { + this.emit("error", new Error("saw " + incoming_buf[pos] + " when expecting final LF")); + this.reset(); + return; + } + break; + default: + throw new Error("invalid state " + this.state); + } + // end_switch = new Date(); + // if (state_times[old_state] === undefined) { + // state_times[old_state] = 0; + // } + // state_times[old_state] += (end_switch - start_switch); + // start_switch = end_switch; + } + // console.log("execute ran for " + (Date.now() - start_execute) + " ms, on " + incoming_buf.length + " Bytes. "); + // Object.keys(state_times).forEach(function (state) { + // console.log(" " + state + ": " + state_times[state]); + // }); +}; + +RedisReplyParser.prototype.send_error = function (reply) { + if (this.multi_bulk_length > 0 || this.multi_bulk_nested_length > 0) { + // TODO - can this happen? Seems like maybe not. + this.add_multi_bulk_reply(reply); + } else { + this.emit("reply error", reply); + } +}; + +RedisReplyParser.prototype.send_reply = function (reply) { + if (this.multi_bulk_length > 0 || this.multi_bulk_nested_length > 0) { + this.add_multi_bulk_reply(reply); + } else { + this.emit("reply", reply); + } +}; + +RedisReplyParser.prototype.add_multi_bulk_reply = function (reply) { + if (this.multi_bulk_replies) { + this.multi_bulk_replies.push(reply); + // use "less than" here because a nil mb reply claims "0 length", but we need 1 slot to hold it + if (this.multi_bulk_replies.length < this.multi_bulk_length) { + return; + } + } else { + this.multi_bulk_replies = reply; + } + + if (this.multi_bulk_nested_length > 0) { + this.multi_bulk_nested_replies.push(this.multi_bulk_replies); + this.multi_bulk_length = 0; + delete this.multi_bulk_replies; + if (this.multi_bulk_nested_length === this.multi_bulk_nested_replies.length) { + this.emit("reply", this.multi_bulk_nested_replies); + this.multi_bulk_nested_length = 0; + this.multi_bulk_nested_replies = null; + } + } else { + this.emit("reply", this.multi_bulk_replies); + this.multi_bulk_length = 0; + this.multi_bulk_replies = null; + } +}; +