1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-10 11:43:01 +03:00

Handle MULTI and various error permutations.

This commit is contained in:
Matt Ranney
2010-09-14 16:43:28 -07:00
parent 06173ff6be
commit 075824960f

259
redis.js
View File

@@ -21,6 +21,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) {
var pos = 0; var pos = 0;
while (pos < incoming_buf.length) { while (pos < incoming_buf.length) {
// console.log("execute " + this.state + " " + pos + " " + String.fromCharCode(incoming_buf[pos]));
switch (this.state) { switch (this.state) {
case "type": case "type":
this.type = incoming_buf[pos]; this.type = incoming_buf[pos];
@@ -53,7 +54,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) {
break; break;
case "integer line": case "integer line":
if (incoming_buf[pos] === 13) { if (incoming_buf[pos] === 13) {
this.emit("integer reply", this.return_buffer.slice(0, this.return_buffer.end)); this.send_reply(parseInt(this.return_buffer.slice(0, this.return_buffer.end)),10);
this.state = "final lf"; this.state = "final lf";
} else { } else {
this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; this.return_buffer[this.return_buffer.end] = incoming_buf[pos];
@@ -64,7 +65,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) {
break; break;
case "error line": case "error line":
if (incoming_buf[pos] === 13) { if (incoming_buf[pos] === 13) {
this.emit("error reply", new Error(this.return_buffer.slice(0, this.return_buffer.end))); this.send_error(this.return_buffer.toString("utf8", 0, this.return_buffer.end));
this.state = "final lf"; this.state = "final lf";
} else { } else {
this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; this.return_buffer[this.return_buffer.end] = incoming_buf[pos];
@@ -74,7 +75,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) {
break; break;
case "single line": case "single line":
if (incoming_buf[pos] === 13) { if (incoming_buf[pos] === 13) {
this.emit("single line reply", this.return_buffer.slice(0, this.return_buffer.end)); this.send_reply(this.return_buffer.toString("utf8", 0, this.return_buffer.end));
this.state = "final lf"; this.state = "final lf";
} else { } else {
this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; this.return_buffer[this.return_buffer.end] = incoming_buf[pos];
@@ -95,7 +96,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) {
case "multi bulk count lf": case "multi bulk count lf":
if (incoming_buf[pos] === 10) { // \n if (incoming_buf[pos] === 10) { // \n
this.multi_bulk_length = parseInt(this.tmp_buffer.toString("utf8", 0, this.tmp_buffer.end), 10); this.multi_bulk_length = parseInt(this.tmp_buffer.toString("utf8", 0, this.tmp_buffer.end), 10);
this.multi_bulk_responses = []; this.multi_bulk_replies = [];
this.state = "type"; this.state = "type";
} else { } else {
this.emit("error", new Error("didn't see LF after NL reading multi bulk count")); this.emit("error", new Error("didn't see LF after NL reading multi bulk count"));
@@ -117,11 +118,7 @@ RedisReplyParser.prototype.execute = function (incoming_buf) {
if (incoming_buf[pos] === 10) { // \n if (incoming_buf[pos] === 10) { // \n
this.bulk_length = parseInt(this.tmp_buffer.toString("utf8", 0, this.tmp_buffer.end), 10); this.bulk_length = parseInt(this.tmp_buffer.toString("utf8", 0, this.tmp_buffer.end), 10);
if (this.bulk_length === -1) { if (this.bulk_length === -1) {
if (this.multi_bulk_length > 0) { this.send_reply(null);
this.add_multi_bulk_response(null);
} else {
this.emit("null reply");
}
this.state = "type"; this.state = "type";
} else { } else {
this.state = "bulk data"; this.state = "bulk data";
@@ -143,13 +140,10 @@ RedisReplyParser.prototype.execute = function (incoming_buf) {
this.return_buffer.end += 1; this.return_buffer.end += 1;
pos += 1; pos += 1;
if (this.return_buffer.end === this.bulk_length) { if (this.return_buffer.end === this.bulk_length) {
if (this.multi_bulk_length > 0) { // this ugilness could go away if we gave the user a volatile buffer, but that seems dangerous
var mb_tmp = new Buffer(this.bulk_length); var bd_tmp = new Buffer(this.bulk_length);
this.return_buffer.copy(mb_tmp, 0, 0, this.bulk_length); this.return_buffer.copy(bd_tmp, 0, 0, this.bulk_length);
this.add_multi_bulk_response(mb_tmp); this.send_reply(bd_tmp);
} else {
this.emit("bulk reply", this.return_buffer.slice(0, this.bulk_length));
}
this.state = "final cr"; this.state = "final cr";
} }
break; break;
@@ -179,12 +173,29 @@ RedisReplyParser.prototype.execute = function (incoming_buf) {
} }
}; };
RedisReplyParser.prototype.add_multi_bulk_response = function (response) { RedisReplyParser.prototype.send_error = function (reply) {
this.multi_bulk_responses.push(response); if (this.multi_bulk_length > 0) {
if (this.multi_bulk_responses.length === this.multi_bulk_length) { // TODO - can this happen? Seems like maybe not.
this.emit("multibulk reply", this.multi_bulk_responses); this.add_multi_bulk_reply(reply);
} else {
this.emit("reply error", reply);
}
}
RedisReplyParser.prototype.send_reply = function (reply) {
if (this.multi_bulk_length > 0) {
this.add_multi_bulk_reply(reply);
} else {
this.emit("reply", reply);
}
};
RedisReplyParser.prototype.add_multi_bulk_reply = function (reply) {
this.multi_bulk_replies.push(reply);
if (this.multi_bulk_replies.length === this.multi_bulk_length) {
this.emit("reply", this.multi_bulk_replies);
this.multi_bulk_length = 0; this.multi_bulk_length = 0;
this.multi_bulk_responses = null; this.multi_bulk_replies = null;
} }
}; };
@@ -194,69 +205,99 @@ function RedisClient(stream) {
this.stream = stream; this.stream = stream;
this.connected = false; this.connected = false;
this.connections = 0; this.connections = 0;
this.commands_sent = 0; this.attempts = 1;
this.commands_in_flight = 0;
this.replies_received = 0;
this.command_queue = []; this.command_queue = [];
this.commands_sent = 0;
this.retry_delay = 250;
this.retry_backoff = 1.7;
var self = this; var self = this;
stream.on("connect", function () { this.stream.on("connect", function () {
self.on_connect(); self.connected = true;
self.connections += 1;
self.command_queue = [];
self.reply_parser = new RedisReplyParser();
self.reply_parser.on("reply error", function (reply) {
self.return_error(reply);
});
self.reply_parser.on("reply", function (reply) {
self.return_reply(reply);
});
self.reply_parser.on("error", function (err) {
console.log("Redis reply parser error: " + err.stack);
});
self.retry_delay = 250;
self.stream.setNoDelay();
self.stream.setTimeout(0);
self.emit("connect");
}); });
stream.on("data", function (buffer_from_socket) { this.stream.on("data", function (buffer_from_socket) {
self.on_data(buffer_from_socket); self.on_data(buffer_from_socket);
}); });
stream.on("error", function () { this.stream.on("error", function (msg) {
console.log("Error connecting to redis server."); if (exports.debug_mode) {
console.warn("Connecting to redis server: " + msg);
}
self.connected = false;
self.emit("error", msg);
}); });
stream.on("close", function () {
console.log("Close on redis connection."); this.stream.on("close", function () {
self.connection_gone();
}); });
stream.on("end", function () {
console.log("End on redis connection."); this.stream.on("end", function () {
self.connection_gone();
}); });
events.EventEmitter.call(this); events.EventEmitter.call(this);
} }
sys.inherits(RedisClient, events.EventEmitter); sys.inherits(RedisClient, events.EventEmitter);
RedisClient.prototype.on_connect = function () { RedisClient.prototype.connection_gone = function () {
console.log("Got connection.");
this.connected = true;
this.connections += 1;
this.reply_parser = new RedisReplyParser();
var self = this; var self = this;
this.reply_parser.on("error reply", function (err) {
self.return_error(err); if (self.retry_timer) {
return;
}
if (exports.debug_mode) {
console.warn("Redis connection is gone.");
}
self.connected = false;
self.emit("close");
self.command_queue.forEach(function (args) {
if (typeof args[2] === "function") {
args[2]("Server connection closed");
}
}); });
this.reply_parser.on("null reply", function () { if (exports.debug_mode) {
self.return_reply(null); console.log("Retry conneciton in " + self.retry_delay + " ms");
}); }
this.reply_parser.on("integer reply", function (response_buffer) { self.attempts += 1;
self.return_reply(parseInt(response_buffer.toString(), 10)); self.emit("reconnecting", "delay " + self.retry_delay + ", attempt " + self.attempts);
}); self.retry_timer = setTimeout(function () {
this.reply_parser.on("bulk reply", function (response_buffer) { if (exports.debug_mode) {
self.return_reply(response_buffer); console.log("Retrying conneciton...");
}); }
this.reply_parser.on("multibulk reply", function (response_list) { self.retry_timer = null;
self.return_reply(response_list); self.retry_delay = self.retry_delay * self.retry_backoff;
}); self.stream.destroy();
this.reply_parser.on("single line reply", function (response_buffer) { self.stream.connect(self.port, self.host);
self.return_reply(response_buffer.toString()); }, self.retry_delay);
}); }
this.reply_parser.on("error", function (err) {
console.log("Redis parser had an error: " + err.stack);
});
this.emit("connect");
};
RedisClient.prototype.on_data = function (data) { RedisClient.prototype.on_data = function (data) {
console.log("on_data: " + data.toString()); if (exports.debug_mode) {
console.log("on_data: " + data.toString());
}
try { try {
this.reply_parser.execute(data); this.reply_parser.execute(data);
} catch (err) { } catch (err) {
@@ -267,14 +308,25 @@ RedisClient.prototype.on_data = function (data) {
RedisClient.prototype.return_error = function (err) { RedisClient.prototype.return_error = function (err) {
var command_obj = this.command_queue.shift(); var command_obj = this.command_queue.shift();
console.log("Error on " + command_obj.command + " " + command_obj.args + ": " + err); if (command_obj && typeof command_obj.callback === "function") {
command_obj.callback(err); command_obj.callback(err);
} else {
console.log("no callback to send error: " + err.stack);
// this will probably not make it anywhere useful, but we might as well try
throw err;
}
} }
RedisClient.prototype.return_reply = function (response_buffer) { RedisClient.prototype.return_reply = function (reply_buffer) {
var command_obj = this.command_queue.shift(); var command_obj = this.command_queue.shift();
command_obj.callback(null, response_buffer); if (command_obj && typeof command_obj.callback === "function") {
command_obj.callback(null, reply_buffer);
} else {
if (this.debug_mode) {
console.log("no callback for reply: " + reply_buffer.toString());
}
}
}; };
RedisClient.prototype.send_command = function (command, args, callback) { RedisClient.prototype.send_command = function (command, args, callback) {
@@ -288,8 +340,8 @@ RedisClient.prototype.send_command = function (command, args, callback) {
return; return;
} }
if (typeof callback !== "function") { if (callback !== undefined && typeof callback !== "function") {
throw new Error("Third argument of send_command must a results callback function"); throw new Error("Third argument of send_command must a results callback function, or omitted");
return; return;
} }
@@ -303,6 +355,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
args: args, args: args,
callback: callback callback: callback
}); });
this.commands_sent += 1;
var elem_count = 1, stream = this.stream, buffer_args = false, command_str = ""; var elem_count = 1, stream = this.stream, buffer_args = false, command_str = "";
@@ -311,7 +364,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
return arg instanceof Buffer; return arg instanceof Buffer;
}); });
// Always use "Multi bulk commands", but if passed Buffer args, then do multiple writes for the args // Always use "Multi bulk commands", but if passed any Buffer args, then do multiple writes, one for each arg
command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n"; command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n";
@@ -322,15 +375,16 @@ RedisClient.prototype.send_command = function (command, args, callback) {
} }
command_str += "$" + arg.length + "\r\n" + arg + "\r\n"; command_str += "$" + arg.length + "\r\n" + arg + "\r\n";
}); });
// console.log("non-buffer full command: " + command_str); if (exports.debug_mode) {
if (stream.write(command_str) === false) { console.log("send command: " + command_str);
console.log("Buffered write 0");
} }
stream.write(command_str);
} else { } else {
console.log("buffer command str: " + command_str); if (exports.debug_mode) {
if (stream.write(command_str) === false) { console.log("send command: " + command_str);
console.log("Buffered write 1"); console.log("send command has Buffer arguments");
} }
stream.write(command_str);
args.forEach(function (arg) { args.forEach(function (arg) {
if (arg.length === undefined) { if (arg.length === undefined) {
@@ -367,10 +421,6 @@ exports.commands = [
"HSET", "HGET", "HMGET", "HMSET", "HINCRBY", "HEXISTS", "HDEL", "HLEN", "HKEYS", "HVALS", "HGETALL", "HSET", "HGET", "HMGET", "HMSET", "HINCRBY", "HEXISTS", "HDEL", "HLEN", "HKEYS", "HVALS", "HGETALL",
// Sorting // Sorting
"SORT", "SORT",
// Transactions
"MULTI", "EXEC", "DISCARD", "WATCH", "UNWATCH",
// Publish/Subscribe
"SUBSCRIBE", "UNSUBSCRIBE", "PUBLISH",
// Persistence control commands // Persistence control commands
"SAVE", "BGSAVE", "LASTSAVE", "SHUTDOWN", "BGREWRITEAOF", "SAVE", "BGSAVE", "LASTSAVE", "SHUTDOWN", "BGREWRITEAOF",
// Remote server control commands // Remote server control commands
@@ -386,6 +436,48 @@ exports.commands.forEach(function (command) {
}; };
}); });
// Transactions
// "MULTI", "EXEC", "DISCARD", "WATCH", "UNWATCH",
// Publish/Subscribe
// "SUBSCRIBE", "UNSUBSCRIBE", "PUBLISH",
RedisClient.prototype.multi = function (commands) {
var self = this;
try {
this.send_command("MULTI", [], function (err, reply) {
if (err) {
console.warn("Error starting MULTI request: " + err.stack);
}
});
commands.forEach(function (args, command_num) {
self.send_command(args[0], args[1], function (err, reply) {
if (err) {
args[2](err);
commands.splice(command_num, 1); // what if this runs before all commands are sent?
} else {
if (reply !== "QUEUED") {
console.warn("Unexpected MULTI reply: " + reply + " instead of 'QUEUED'");
}
}
});
});
this.send_command("EXEC", [], function (err, replies) {
replies.forEach(function (reply, reply_num) {
if (typeof commands[reply_num][2] === "function") {
commands[reply_num][2](null, reply)
} else {
if (exports.debug_mode) {
console.log("no callback for multi response " + reply_num + ", skipping.");
}
}
});
});
} catch (err) {
console.log("Caught exception in MULTI: " + err.stack);
}
};
exports.createClient = function (port_arg, host_arg, options) { exports.createClient = function (port_arg, host_arg, options) {
var port = port_arg || default_port, var port = port_arg || default_port,
host = host || default_host, host = host || default_host,
@@ -401,3 +493,10 @@ exports.createClient = function (port_arg, host_arg, options) {
return red_client; return red_client;
}; };
exports.print = function (err, reply) {
if (err) {
console.log("Error: " + err);
} else {
console.log("Reply: " + reply);
}
};