1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Fix flush+error bug on server disconnect.

Also assign a client ID to each client because stream.fd isn't available in node 0.6.
This commit is contained in:
Matt Ranney
2011-11-13 18:11:28 -10:00
parent f2a6a20a74
commit b633587b49

View File

@@ -6,6 +6,7 @@ var net = require("net"),
to_array = require("./lib/to_array"),
events = require("events"),
parsers = [], commands,
connection_id = 0,
default_port = 6379,
default_host = "127.0.0.1";
@@ -18,7 +19,7 @@ try {
parsers.push(require("./lib/parser/hiredis"));
} catch (err) {
if (exports.debug_mode) {
console.log("hiredis parser not installed.");
console.warn("hiredis parser not installed.");
}
}
@@ -28,6 +29,7 @@ function RedisClient(stream, options) {
this.stream = stream;
this.options = options = options || {};
this.connection_id = ++connection_id;
this.connected = false;
this.ready = false;
this.connections = 0;
@@ -88,6 +90,26 @@ function RedisClient(stream, options) {
util.inherits(RedisClient, events.EventEmitter);
exports.RedisClient = RedisClient;
// flush offline_queue and command_queue, erroring any items with a callback first
RedisClient.prototype.flush_and_error = function (message) {
var command_obj;
while (this.offline_queue.length > 0) {
command_obj = this.offline_queue.shift();
if (typeof command_obj.callback === "function") {
command_obj.callback(message);
}
}
this.offline_queue = new Queue();
while (this.command_queue.length > 0) {
command_obj = this.command_queue.shift();
if (typeof command_obj.callback === "function") {
command_obj.callback(message);
}
}
this.command_queue = new Queue();
};
RedisClient.prototype.on_error = function (msg) {
var message = "Redis connection to " + this.host + ":" + this.port + " failed - " + msg,
self = this, command_obj;
@@ -100,24 +122,7 @@ RedisClient.prototype.on_error = function (msg) {
console.warn(message);
}
// send errors to any commands in the offline queue, then reset
while (this.offline_queue.length > 0) {
command_obj = this.offline_queue.shift();
console.dir(command_obj);
if (typeof command_obj.callback === "function") {
command_obj.callback(message);
}
}
this.offline_queue = new Queue();
// send errors to any commands in the command queue, then reset
while (this.command_queue.length > 0) {
command_obj = this.command_queue.shift();
if (typeof command_obj.callback === "function") {
command_obj.callback(message);
}
}
this.command_queue = new Queue();
this.flush_and_error(message);
this.connected = false;
this.ready = false;
@@ -132,7 +137,7 @@ RedisClient.prototype.do_auth = function () {
var self = this;
if (exports.debug_mode) {
console.log("Sending auth to " + self.host + ":" + self.port + " fd " + self.stream.fd);
console.log("Sending auth to " + self.host + ":" + self.port + " id " + self.connection_id);
}
self.send_anyway = true;
self.send_command("auth", [this.auth_pass], function (err, res) {
@@ -152,7 +157,7 @@ RedisClient.prototype.do_auth = function () {
return self.emit("error", "Auth failed: " + res.toString());
}
if (exports.debug_mode) {
console.log("Auth succeeded " + self.host + ":" + self.port + " fd " + self.stream.fd);
console.log("Auth succeeded " + self.host + ":" + self.port + " id " + self.connection_id);
}
if (self.auth_callback) {
self.auth_callback(err, res);
@@ -173,7 +178,7 @@ RedisClient.prototype.do_auth = function () {
RedisClient.prototype.on_connect = function () {
if (exports.debug_mode) {
console.log("Stream connected " + this.host + ":" + this.port + " fd " + this.stream.fd);
console.log("Stream connected " + this.host + ":" + this.port + " id " + this.connection_id);
}
var self = this;
@@ -332,9 +337,6 @@ RedisClient.prototype.connection_gone = function (why) {
return;
}
// Note that this may trigger another "close" or "end" event
this.stream.destroy();
if (exports.debug_mode) {
console.warn("Redis connection is gone from " + why + " event.");
}
@@ -349,16 +351,14 @@ RedisClient.prototype.connection_gone = function (why) {
this.emitted_end = true;
}
this.command_queue.forEach(function (args) {
if (typeof args.callback === "function") {
args.callback("Server connection closed");
}
});
this.command_queue = new Queue();
this.flush_and_error("Redis connection gone from " + why + " event.");
// If this is a requested shutdown, then don't retry
if (this.closing) {
this.retry_timer = null;
if (exports.debug_mode) {
console.warn("connection ended from quit command, not retrying.");
}
return;
}
@@ -401,7 +401,7 @@ RedisClient.prototype.connection_gone = function (why) {
RedisClient.prototype.on_data = function (data) {
if (exports.debug_mode) {
console.log("net read " + this.host + ":" + this.port + " fd " + this.stream.fd + ": " + data.toString());
console.log("net read " + this.host + ":" + this.port + " id " + this.connection_id + ": " + data.toString());
}
try {
@@ -621,7 +621,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
command_str += "$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n";
}
if (exports.debug_mode) {
console.log("send " + this.host + ":" + this.port + " fd " + this.stream.fd + ": " + command_str);
console.log("send " + this.host + ":" + this.port + " id " + this.connection_id + ": " + command_str);
}
buffered_writes += !stream.write(command_str);
} else {