1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Queue commands sent before a connection is available.

This commit is contained in:
Matt Ranney
2010-09-17 14:24:10 -07:00
parent b48105eeb3
commit c4e13514d9

View File

@@ -17,6 +17,7 @@ function RedisReplyParser() {
} }
sys.inherits(RedisReplyParser, events.EventEmitter); sys.inherits(RedisReplyParser, events.EventEmitter);
// Buffer.toString() is quite slow for small strings
function small_toString(buf) { function small_toString(buf) {
var tmp = "", i = 0, end = buf.end; var tmp = "", i = 0, end = buf.end;
@@ -280,7 +281,8 @@ function RedisClient(stream) {
this.connected = false; this.connected = false;
this.connections = 0; this.connections = 0;
this.attempts = 1; this.attempts = 1;
this.command_queue = new Queue; this.command_queue = new Queue(); // holds sent commands to de-pipeline them
this.offline_queue = new Queue(); // holds commands issued but not able to be sent
this.commands_sent = 0; this.commands_sent = 0;
this.retry_delay = 250; this.retry_delay = 250;
this.retry_backoff = 1.7; this.retry_backoff = 1.7;
@@ -307,6 +309,15 @@ function RedisClient(stream) {
self.stream.setNoDelay(); self.stream.setNoDelay();
self.stream.setTimeout(0); self.stream.setTimeout(0);
var command_obj;
while (self.offline_queue.length > 0) {
command_obj = self.offline_queue.shift();
if (exports.debug_mode) {
console.log("Sending offline command: " + command_obj.command);
}
self.send_command(command_obj.command, command_obj.args, command_obj.callback);
}
self.emit("connect"); self.emit("connect");
}); });
@@ -318,6 +329,12 @@ function RedisClient(stream) {
if (exports.debug_mode) { if (exports.debug_mode) {
console.warn("Connecting to redis server: " + msg); console.warn("Connecting to redis server: " + msg);
} }
self.offline_queue.forEach(function (args) {
if (typeof args[2] === "function") {
args[2]("Server connection could not be established");
}
});
self.connected = false; self.connected = false;
self.emit("error", msg); self.emit("error", msg);
}); });
@@ -404,7 +421,7 @@ RedisClient.prototype.return_reply = function (reply_buffer) {
}; };
RedisClient.prototype.send_command = function () { RedisClient.prototype.send_command = function () {
var command, callback, args, this_args; var command, callback, args, this_args, command_obj;
this_args = Array.prototype.slice.call(arguments); // convert arguments into real array this_args = Array.prototype.slice.call(arguments); // convert arguments into real array
@@ -427,17 +444,21 @@ RedisClient.prototype.send_command = function () {
throw new Error("First argument of send_command must be the command name"); throw new Error("First argument of send_command must be the command name");
} }
if (! this.connected) { command_obj = {
// TODO - queue this command and send it once we are connected.
callback(new Error("Redis client is not connected"));
return;
}
this.command_queue.push({
command: command, command: command,
args: args, args: args,
callback: callback callback: callback
}); };
if (! this.connected) {
if (exports.debug_mode) {
console.log("Queueing " + command + " for next server connection.");
}
this.offline_queue.push(command_obj);
return;
}
this.command_queue.push(command_obj);
this.commands_sent += 1; this.commands_sent += 1;
var elem_count = 1, stream = this.stream, buffer_args = false, command_str = ""; var elem_count = 1, stream = this.stream, buffer_args = false, command_str = "";
@@ -461,6 +482,7 @@ RedisClient.prototype.send_command = function () {
if (exports.debug_mode) { if (exports.debug_mode) {
console.log("send command: " + command_str); console.log("send command: " + command_str);
} }
// Need to catch "Stream is not writable" exception here and error everybody in the command queue out
stream.write(command_str); stream.write(command_str);
} else { } else {
if (exports.debug_mode) { if (exports.debug_mode) {