You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-07 13:22:56 +03:00
Save and clear client state. Restore it after the connection is ready.
This change stores the connection state regarding subscriptions, selected db and monitoring. When the connection to Redis drops, the state is reestablished after a succesful reconnect. Fixes #241. Fixes #210. Signed-off-by: DTrejo <david.trejo@voxer.com>
This commit is contained in:
33
index.js
33
index.js
@@ -66,6 +66,8 @@ function RedisClient(stream, options) {
|
|||||||
this.parser_module = null;
|
this.parser_module = null;
|
||||||
this.selected_db = null; // save the selected db here, used when reconnecting
|
this.selected_db = null; // save the selected db here, used when reconnecting
|
||||||
|
|
||||||
|
this.old_state = null;
|
||||||
|
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|
||||||
this.stream.on("connect", function () {
|
this.stream.on("connect", function () {
|
||||||
@@ -272,18 +274,35 @@ RedisClient.prototype.on_ready = function () {
|
|||||||
|
|
||||||
this.ready = true;
|
this.ready = true;
|
||||||
|
|
||||||
|
if (this.old_state !== null) {
|
||||||
|
this.monitoring = this.old_state.monitoring;
|
||||||
|
this.pub_sub_mode = this.old_state.pub_sub_mode;
|
||||||
|
this.selected_db = this.old_state.selected_db;
|
||||||
|
this.old_state = null;
|
||||||
|
}
|
||||||
|
|
||||||
// magically restore any modal commands from a previous connection
|
// magically restore any modal commands from a previous connection
|
||||||
if (this.selected_db !== null) {
|
if (this.selected_db !== null) {
|
||||||
this.send_command('select', [this.selected_db]);
|
this.send_command('select', [this.selected_db]);
|
||||||
}
|
}
|
||||||
if (this.pub_sub_mode === true) {
|
if (this.pub_sub_mode === true) {
|
||||||
|
// only emit "ready" when all subscriptions were made again
|
||||||
|
var callback_count = 0;
|
||||||
|
var callback = function() {
|
||||||
|
callback_count--;
|
||||||
|
if (callback_count == 0) {
|
||||||
|
self.emit("ready");
|
||||||
|
}
|
||||||
|
}
|
||||||
Object.keys(this.subscription_set).forEach(function (key) {
|
Object.keys(this.subscription_set).forEach(function (key) {
|
||||||
var parts = key.split(" ");
|
var parts = key.split(" ");
|
||||||
if (exports.debug_mode) {
|
if (exports.debug_mode) {
|
||||||
console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]);
|
console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]);
|
||||||
}
|
}
|
||||||
self.send_command(parts[0], [parts[1]]);
|
callback_count++;
|
||||||
|
self.send_command(parts[0] + "scribe", [parts[1]], callback);
|
||||||
});
|
});
|
||||||
|
return;
|
||||||
} else if (this.monitoring) {
|
} else if (this.monitoring) {
|
||||||
this.send_command("monitor");
|
this.send_command("monitor");
|
||||||
} else {
|
} else {
|
||||||
@@ -382,6 +401,18 @@ RedisClient.prototype.connection_gone = function (why) {
|
|||||||
this.connected = false;
|
this.connected = false;
|
||||||
this.ready = false;
|
this.ready = false;
|
||||||
|
|
||||||
|
if (this.old_state === null) {
|
||||||
|
var state = {
|
||||||
|
monitoring: this.monitoring,
|
||||||
|
pub_sub_mode: this.pub_sub_mode,
|
||||||
|
selected_db: this.selected_db
|
||||||
|
};
|
||||||
|
this.old_state = state;
|
||||||
|
this.monitoring = false;
|
||||||
|
this.pub_sub_mode = false;
|
||||||
|
this.selected_db = null;
|
||||||
|
}
|
||||||
|
|
||||||
// since we are collapsing end and close, users don't expect to be called twice
|
// since we are collapsing end and close, users don't expect to be called twice
|
||||||
if (! this.emitted_end) {
|
if (! this.emitted_end) {
|
||||||
this.emit("end");
|
this.emit("end");
|
||||||
|
60
test.js
60
test.js
@@ -619,6 +619,66 @@ tests.SUBSCRIBE_QUIT = function () {
|
|||||||
client3.subscribe("chan3");
|
client3.subscribe("chan3");
|
||||||
};
|
};
|
||||||
|
|
||||||
|
tests.SUBSCRIBE_CLOSE_RESUBSCRIBE = function () {
|
||||||
|
var name = "SUBSCRIBE_CLOSE_RESUBSCRIBE";
|
||||||
|
var c1 = redis.createClient();
|
||||||
|
var c2 = redis.createClient();
|
||||||
|
var count = 0;
|
||||||
|
|
||||||
|
/* Create two clients. c1 subscribes to two channels, c2 will publish to them.
|
||||||
|
c2 publishes the first message.
|
||||||
|
c1 gets the message and drops its connection. It must resubscribe itself.
|
||||||
|
When it resubscribes, c2 publishes the second message, on the same channel
|
||||||
|
c1 gets the message and drops its connection. It must resubscribe itself, again.
|
||||||
|
When it resubscribes, c2 publishes the third message, on the second channel
|
||||||
|
c1 gets the message and drops its connection. When it reconnects, the test ends.
|
||||||
|
*/
|
||||||
|
|
||||||
|
c1.on("message", function(channel, message) {
|
||||||
|
if (channel === "chan1") {
|
||||||
|
assert.strictEqual(message, "hi on channel 1");
|
||||||
|
c1.stream.end();
|
||||||
|
|
||||||
|
} else if (channel === "chan2") {
|
||||||
|
assert.strictEqual(message, "hi on channel 2");
|
||||||
|
c1.stream.end();
|
||||||
|
|
||||||
|
} else {
|
||||||
|
c1.quit();
|
||||||
|
c2.quit();
|
||||||
|
assert.fail("test failed");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
c1.subscribe("chan1", "chan2");
|
||||||
|
|
||||||
|
c2.once("ready", function() {
|
||||||
|
console.log("c2 is ready");
|
||||||
|
c1.on("ready", function(err, results) {
|
||||||
|
console.log("c1 is ready", count);
|
||||||
|
|
||||||
|
count++;
|
||||||
|
if (count == 1) {
|
||||||
|
c2.publish("chan1", "hi on channel 1");
|
||||||
|
return;
|
||||||
|
|
||||||
|
} else if (count == 2) {
|
||||||
|
c2.publish("chan2", "hi on channel 2");
|
||||||
|
|
||||||
|
} else {
|
||||||
|
c1.quit(function() {
|
||||||
|
c2.quit(function() {
|
||||||
|
next(name);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
c2.publish("chan1", "hi on channel 1");
|
||||||
|
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
tests.EXISTS = function () {
|
tests.EXISTS = function () {
|
||||||
var name = "EXISTS";
|
var name = "EXISTS";
|
||||||
client.del("foo", "foo2", require_number_any(name));
|
client.del("foo", "foo2", require_number_any(name));
|
||||||
|
Reference in New Issue
Block a user