diff --git a/index.js b/index.js index bd8c16a2ef..77bda9779d 100644 --- a/index.js +++ b/index.js @@ -66,6 +66,8 @@ function RedisClient(stream, options) { this.parser_module = null; this.selected_db = null; // save the selected db here, used when reconnecting + this.old_state = null; + var self = this; this.stream.on("connect", function () { @@ -272,18 +274,35 @@ RedisClient.prototype.on_ready = function () { this.ready = true; + if (this.old_state !== null) { + this.monitoring = this.old_state.monitoring; + this.pub_sub_mode = this.old_state.pub_sub_mode; + this.selected_db = this.old_state.selected_db; + this.old_state = null; + } + // magically restore any modal commands from a previous connection if (this.selected_db !== null) { this.send_command('select', [this.selected_db]); } if (this.pub_sub_mode === true) { + // only emit "ready" when all subscriptions were made again + var callback_count = 0; + var callback = function() { + callback_count--; + if (callback_count == 0) { + self.emit("ready"); + } + } Object.keys(this.subscription_set).forEach(function (key) { var parts = key.split(" "); if (exports.debug_mode) { console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]); } - self.send_command(parts[0], [parts[1]]); + callback_count++; + self.send_command(parts[0] + "scribe", [parts[1]], callback); }); + return; } else if (this.monitoring) { this.send_command("monitor"); } else { @@ -382,6 +401,18 @@ RedisClient.prototype.connection_gone = function (why) { this.connected = false; this.ready = false; + if (this.old_state === null) { + var state = { + monitoring: this.monitoring, + pub_sub_mode: this.pub_sub_mode, + selected_db: this.selected_db + }; + this.old_state = state; + this.monitoring = false; + this.pub_sub_mode = false; + this.selected_db = null; + } + // since we are collapsing end and close, users don't expect to be called twice if (! this.emitted_end) { this.emit("end"); diff --git a/test.js b/test.js index f6f7f0e3be..f85050cb6b 100644 --- a/test.js +++ b/test.js @@ -619,6 +619,66 @@ tests.SUBSCRIBE_QUIT = function () { client3.subscribe("chan3"); }; +tests.SUBSCRIBE_CLOSE_RESUBSCRIBE = function () { + var name = "SUBSCRIBE_CLOSE_RESUBSCRIBE"; + var c1 = redis.createClient(); + var c2 = redis.createClient(); + var count = 0; + + /* Create two clients. c1 subscribes to two channels, c2 will publish to them. + c2 publishes the first message. + c1 gets the message and drops its connection. It must resubscribe itself. + When it resubscribes, c2 publishes the second message, on the same channel + c1 gets the message and drops its connection. It must resubscribe itself, again. + When it resubscribes, c2 publishes the third message, on the second channel + c1 gets the message and drops its connection. When it reconnects, the test ends. + */ + + c1.on("message", function(channel, message) { + if (channel === "chan1") { + assert.strictEqual(message, "hi on channel 1"); + c1.stream.end(); + + } else if (channel === "chan2") { + assert.strictEqual(message, "hi on channel 2"); + c1.stream.end(); + + } else { + c1.quit(); + c2.quit(); + assert.fail("test failed"); + } + }) + + c1.subscribe("chan1", "chan2"); + + c2.once("ready", function() { + console.log("c2 is ready"); + c1.on("ready", function(err, results) { + console.log("c1 is ready", count); + + count++; + if (count == 1) { + c2.publish("chan1", "hi on channel 1"); + return; + + } else if (count == 2) { + c2.publish("chan2", "hi on channel 2"); + + } else { + c1.quit(function() { + c2.quit(function() { + next(name); + }); + }); + } + }); + + c2.publish("chan1", "hi on channel 1"); + + }); +}; + tests.EXISTS = function () { var name = "EXISTS"; client.del("foo", "foo2", require_number_any(name));