1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Fix for [GH-93] - restore subscriptions, monitor, db, and auth on reconnect.

Also fixes bug with re-selecting db when auth is required.
Still needs a test for pub/sub reconnect and monitor reconnect.
This commit is contained in:
Matt Ranney
2011-11-16 17:27:26 -10:00
parent 642df49924
commit 3a7c6388de
2 changed files with 102 additions and 57 deletions

104
index.js
View File

@@ -51,7 +51,8 @@ function RedisClient(stream, options) {
this.connect_timeout = +options.connect_timeout;
}
this.initialize_retry_vars();
this.subscriptions = false;
this.pub_sub_mode = false;
this.subscription_set = {};
this.monitoring = false;
this.closing = false;
this.server_info = {};
@@ -94,7 +95,7 @@ exports.RedisClient = RedisClient;
RedisClient.prototype.initialize_retry_vars = function () {
this.retry_timer = null;
this.retry_totaltime = 0;
this.retry_delay = 250;
this.retry_delay = 150;
this.retry_backoff = 1.7;
this.attempts = 1;
};
@@ -173,16 +174,10 @@ RedisClient.prototype.do_auth = function () {
self.auth_callback = null;
}
// restore the selected db if needed
if (self.selected_db !== null) {
self.send_command('select', [self.selected_db]);
}
// now we are really connected
self.emit("connect");
if (self.options.no_ready_check) {
self.ready = true;
self.send_offline_queue();
self.on_ready();
} else {
self.ready_check();
}
@@ -213,16 +208,10 @@ RedisClient.prototype.on_connect = function () {
if (this.auth_pass) {
this.do_auth();
} else {
// restore the selected db if needed
if (this.selected_db !== null) {
this.send_command('select', [this.selected_db]);
}
this.emit("connect");
if (this.options.no_ready_check) {
this.ready = true;
this.send_offline_queue();
this.on_ready();
} else {
this.ready_check();
}
@@ -272,6 +261,31 @@ RedisClient.prototype.init_parser = function () {
});
};
RedisClient.prototype.on_ready = function () {
var self = this;
this.ready = true;
// magically restore any modal commands from a previous connection
if (this.selected_db !== null) {
this.send_command('select', [this.selected_db]);
}
if (this.pub_sub_mode === true) {
Object.keys(this.subscription_set).forEach(function (key) {
var parts = key.split(" ");
if (exports.debug_mode) {
console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]);
}
self.send_command(parts[0], [parts[1]]);
});
} else if (this.monitoring) {
this.send_command("monitor");
} else {
this.send_offline_queue();
}
this.emit("ready");
};
RedisClient.prototype.on_info_cmd = function (err, res) {
var self = this, obj = {}, lines, retry_time;
@@ -300,10 +314,7 @@ RedisClient.prototype.on_info_cmd = function (err, res) {
if (exports.debug_mode) {
console.log("Redis server ready.");
}
this.ready = true;
this.send_offline_queue();
this.emit("ready");
this.on_ready();
} else {
retry_time = obj.loading_eta_seconds * 1000;
if (retry_time > 1000) {
@@ -334,6 +345,7 @@ RedisClient.prototype.ready_check = function () {
RedisClient.prototype.send_offline_queue = function () {
var command_obj, buffered_writes = 0;
while (this.offline_queue.length > 0) {
command_obj = this.offline_queue.shift();
if (exports.debug_mode) {
@@ -363,8 +375,6 @@ RedisClient.prototype.connection_gone = function (why) {
}
this.connected = false;
this.ready = false;
this.subscriptions = false;
this.monitoring = false;
// since we are collapsing end and close, users don't expect to be called twice
if (! this.emitted_end) {
@@ -440,7 +450,7 @@ RedisClient.prototype.on_data = function (data) {
RedisClient.prototype.return_error = function (err) {
var command_obj = this.command_queue.shift(), queue_len = this.command_queue.getLength();
if (this.subscriptions === false && queue_len === 0) {
if (this.pub_sub_mode === false && queue_len === 0) {
this.emit("idle");
this.command_queue = new Queue();
}
@@ -518,7 +528,7 @@ RedisClient.prototype.return_reply = function (reply) {
queue_len = this.command_queue.getLength();
if (this.subscriptions === false && queue_len === 0) {
if (this.pub_sub_mode === false && queue_len === 0) {
this.emit("idle");
this.command_queue = new Queue(); // explicitly reclaim storage from old Queue
}
@@ -546,7 +556,7 @@ RedisClient.prototype.return_reply = function (reply) {
} else if (exports.debug_mode) {
console.log("no callback for reply: " + (reply && reply.toString && reply.toString()));
}
} else if (this.subscriptions || (command_obj && command_obj.sub_command)) {
} else if (this.pub_sub_mode || (command_obj && command_obj.sub_command)) {
if (Array.isArray(reply)) {
type = reply[0].toString();
@@ -556,13 +566,13 @@ RedisClient.prototype.return_reply = function (reply) {
this.emit("pmessage", reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message
} else if (type === "subscribe" || type === "unsubscribe" || type === "psubscribe" || type === "punsubscribe") {
if (reply[2] === 0) {
this.subscriptions = false;
this.pub_sub_mode = false;
if (this.debug_mode) {
console.log("All subscriptions removed, exiting pub/sub mode");
}
}
// subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback
// TODO - document this
// TODO - document this or fix it so it works in a more obvious way
if (command_obj && typeof command_obj.callback === "function") {
try_callback(command_obj.callback, reply[1].toString());
}
@@ -659,16 +669,12 @@ RedisClient.prototype.send_command = function (command, args, callback) {
}
if (command === "subscribe" || command === "psubscribe" || command === "unsubscribe" || command === "punsubscribe") {
if (this.subscriptions === false && exports.debug_mode) {
console.log("Entering pub/sub mode from " + command);
}
command_obj.sub_command = true;
this.subscriptions = true;
this.pub_sub_command(command_obj);
} else if (command === "monitor") {
this.monitoring = true;
} else if (command === "quit") {
this.closing = true;
} else if (this.subscriptions === true) {
} else if (this.pub_sub_mode === true) {
throw new Error("Connection in pub/sub mode, only pub/sub commands may be used");
}
this.command_queue.push(command_obj);
@@ -736,6 +742,38 @@ RedisClient.prototype.send_command = function (command, args, callback) {
return !this.should_buffer;
};
RedisClient.prototype.pub_sub_command = function (command_obj) {
var i, key, command, args;
if (this.pub_sub_mode === false && exports.debug_mode) {
console.log("Entering pub/sub mode from " + command_obj.command);
}
this.pub_sub_mode = true;
command_obj.sub_command = true;
command = command_obj.command;
args = command_obj.args;
if (command === "subscribe" || command === "psubscribe") {
if (command === "subscribe") {
key = "sub";
} else {
key = "psub";
}
for (i = 0; i < args.length; i++) {
this.subscription_set[key + " " + args[i]] = true;
}
} else {
if (command === "unsubscribe") {
key = "sub";
} else {
key = "psub";
}
for (i = 0; i < args.length; i++) {
delete this.subscription_set[key + " " + args[i]];
}
}
};
RedisClient.prototype.end = function () {
this.stream._events = {};
this.connected = false;