diff --git a/index.js b/index.js index 54033ea58e..5c7799a8f8 100644 --- a/index.js +++ b/index.js @@ -51,7 +51,8 @@ function RedisClient(stream, options) { this.connect_timeout = +options.connect_timeout; } this.initialize_retry_vars(); - this.subscriptions = false; + this.pub_sub_mode = false; + this.subscription_set = {}; this.monitoring = false; this.closing = false; this.server_info = {}; @@ -94,7 +95,7 @@ exports.RedisClient = RedisClient; RedisClient.prototype.initialize_retry_vars = function () { this.retry_timer = null; this.retry_totaltime = 0; - this.retry_delay = 250; + this.retry_delay = 150; this.retry_backoff = 1.7; this.attempts = 1; }; @@ -173,16 +174,10 @@ RedisClient.prototype.do_auth = function () { self.auth_callback = null; } - // restore the selected db if needed - if (self.selected_db !== null) { - self.send_command('select', [self.selected_db]); - } - // now we are really connected self.emit("connect"); if (self.options.no_ready_check) { - self.ready = true; - self.send_offline_queue(); + self.on_ready(); } else { self.ready_check(); } @@ -213,16 +208,10 @@ RedisClient.prototype.on_connect = function () { if (this.auth_pass) { this.do_auth(); } else { - // restore the selected db if needed - if (this.selected_db !== null) { - this.send_command('select', [this.selected_db]); - } - this.emit("connect"); if (this.options.no_ready_check) { - this.ready = true; - this.send_offline_queue(); + this.on_ready(); } else { this.ready_check(); } @@ -272,6 +261,31 @@ RedisClient.prototype.init_parser = function () { }); }; +RedisClient.prototype.on_ready = function () { + var self = this; + + this.ready = true; + + // magically restore any modal commands from a previous connection + if (this.selected_db !== null) { + this.send_command('select', [this.selected_db]); + } + if (this.pub_sub_mode === true) { + Object.keys(this.subscription_set).forEach(function (key) { + var parts = key.split(" "); + if (exports.debug_mode) { + console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]); + } + self.send_command(parts[0], [parts[1]]); + }); + } else if (this.monitoring) { + this.send_command("monitor"); + } else { + this.send_offline_queue(); + } + this.emit("ready"); +}; + RedisClient.prototype.on_info_cmd = function (err, res) { var self = this, obj = {}, lines, retry_time; @@ -300,10 +314,7 @@ RedisClient.prototype.on_info_cmd = function (err, res) { if (exports.debug_mode) { console.log("Redis server ready."); } - this.ready = true; - - this.send_offline_queue(); - this.emit("ready"); + this.on_ready(); } else { retry_time = obj.loading_eta_seconds * 1000; if (retry_time > 1000) { @@ -334,6 +345,7 @@ RedisClient.prototype.ready_check = function () { RedisClient.prototype.send_offline_queue = function () { var command_obj, buffered_writes = 0; + while (this.offline_queue.length > 0) { command_obj = this.offline_queue.shift(); if (exports.debug_mode) { @@ -363,8 +375,6 @@ RedisClient.prototype.connection_gone = function (why) { } this.connected = false; this.ready = false; - this.subscriptions = false; - this.monitoring = false; // since we are collapsing end and close, users don't expect to be called twice if (! this.emitted_end) { @@ -440,7 +450,7 @@ RedisClient.prototype.on_data = function (data) { RedisClient.prototype.return_error = function (err) { var command_obj = this.command_queue.shift(), queue_len = this.command_queue.getLength(); - if (this.subscriptions === false && queue_len === 0) { + if (this.pub_sub_mode === false && queue_len === 0) { this.emit("idle"); this.command_queue = new Queue(); } @@ -518,7 +528,7 @@ RedisClient.prototype.return_reply = function (reply) { queue_len = this.command_queue.getLength(); - if (this.subscriptions === false && queue_len === 0) { + if (this.pub_sub_mode === false && queue_len === 0) { this.emit("idle"); this.command_queue = new Queue(); // explicitly reclaim storage from old Queue } @@ -546,7 +556,7 @@ RedisClient.prototype.return_reply = function (reply) { } else if (exports.debug_mode) { console.log("no callback for reply: " + (reply && reply.toString && reply.toString())); } - } else if (this.subscriptions || (command_obj && command_obj.sub_command)) { + } else if (this.pub_sub_mode || (command_obj && command_obj.sub_command)) { if (Array.isArray(reply)) { type = reply[0].toString(); @@ -556,13 +566,13 @@ RedisClient.prototype.return_reply = function (reply) { this.emit("pmessage", reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message } else if (type === "subscribe" || type === "unsubscribe" || type === "psubscribe" || type === "punsubscribe") { if (reply[2] === 0) { - this.subscriptions = false; + this.pub_sub_mode = false; if (this.debug_mode) { console.log("All subscriptions removed, exiting pub/sub mode"); } } // subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback - // TODO - document this + // TODO - document this or fix it so it works in a more obvious way if (command_obj && typeof command_obj.callback === "function") { try_callback(command_obj.callback, reply[1].toString()); } @@ -659,16 +669,12 @@ RedisClient.prototype.send_command = function (command, args, callback) { } if (command === "subscribe" || command === "psubscribe" || command === "unsubscribe" || command === "punsubscribe") { - if (this.subscriptions === false && exports.debug_mode) { - console.log("Entering pub/sub mode from " + command); - } - command_obj.sub_command = true; - this.subscriptions = true; + this.pub_sub_command(command_obj); } else if (command === "monitor") { this.monitoring = true; } else if (command === "quit") { this.closing = true; - } else if (this.subscriptions === true) { + } else if (this.pub_sub_mode === true) { throw new Error("Connection in pub/sub mode, only pub/sub commands may be used"); } this.command_queue.push(command_obj); @@ -736,6 +742,38 @@ RedisClient.prototype.send_command = function (command, args, callback) { return !this.should_buffer; }; +RedisClient.prototype.pub_sub_command = function (command_obj) { + var i, key, command, args; + + if (this.pub_sub_mode === false && exports.debug_mode) { + console.log("Entering pub/sub mode from " + command_obj.command); + } + this.pub_sub_mode = true; + command_obj.sub_command = true; + + command = command_obj.command; + args = command_obj.args; + if (command === "subscribe" || command === "psubscribe") { + if (command === "subscribe") { + key = "sub"; + } else { + key = "psub"; + } + for (i = 0; i < args.length; i++) { + this.subscription_set[key + " " + args[i]] = true; + } + } else { + if (command === "unsubscribe") { + key = "sub"; + } else { + key = "psub"; + } + for (i = 0; i < args.length; i++) { + delete this.subscription_set[key + " " + args[i]]; + } + } +}; + RedisClient.prototype.end = function () { this.stream._events = {}; this.connected = false; diff --git a/test.js b/test.js index a146a32f3d..cc7ba88878 100644 --- a/test.js +++ b/test.js @@ -3,8 +3,6 @@ var redis = require("./index"), client = redis.createClient(), client2 = redis.createClient(), client3 = redis.createClient(), - client4 = redis.createClient(9006, "filefish.redistogo.com"), - client5 = redis.createClient(), assert = require("assert"), util = require("./lib/util"), test_db_num = 15, // this DB will be flushed and used for testing @@ -1238,9 +1236,10 @@ tests.SORT = function () { }; tests.MONITOR = function () { - var name = "MONITOR", responses = []; + var name = "MONITOR", responses = [], monitor_client; - client5.monitor(function (err, res) { + monitor_client = redis.createClient(); + monitor_client.monitor(function (err, res) { client.mget("some", "keys", "foo", "bar"); client.set("json", JSON.stringify({ foo: "123", @@ -1248,7 +1247,7 @@ tests.MONITOR = function () { another: false })); }); - client5.on("monitor", function (time, args) { + monitor_client.on("monitor", function (time, args) { responses.push(args); if (responses.length === 3) { assert.strictEqual(1, responses[0].length); @@ -1263,7 +1262,9 @@ tests.MONITOR = function () { assert.strictEqual("set", responses[2][0]); assert.strictEqual("json", responses[2][1]); assert.strictEqual('{"foo":"123","bar":"sdflkdfsjk","another":false}', responses[2][2]); - next(name); + monitor_client.quit(function (err, res) { + next(name); + }); } }); }; @@ -1332,6 +1333,30 @@ tests.OPTIONAL_CALLBACK_UNDEFINED = function () { client.get("op_cb2", last(name, require_string("y", name))); }; +// TODO - need a better way to test auth, maybe auto-config a local Redis server or something. +// Yes, this is the real password. Please be nice, thanks. +tests.auth = function () { + var name = "AUTH", client4, ready_count = 0; + + client4 = redis.createClient(9006, "filefish.redistogo.com"); + client4.auth("664b1b6aaf134e1ec281945a8de702a9", function (err, res) { + assert.strictEqual(null, err, name); + assert.strictEqual("OK", res.toString(), name); + }); + + // test auth, then kill the connection so it'll auto-reconnect and auto-re-auth + client4.on("ready", function () { + ready_count++; + if (ready_count === 1) { + client4.stream.destroy(); + } else { + client4.quit(function (err, res) { + next(name); + }); + } + }); +}; + all_tests = Object.keys(tests); all_start = new Date(); test_count = 0; @@ -1347,8 +1372,6 @@ run_next_test = function run_next_test() { console.log('\n completed \x1b[32m%d\x1b[0m tests in \x1b[33m%d\x1b[0m ms\n', test_count, new Date() - all_start); client.quit(); client2.quit(); - client4.quit(); - client5.quit(); } }; @@ -1365,17 +1388,6 @@ client.on('end', function () { ended = true; }); -// TODO - need a better way to test auth, maybe auto-config a local Redis server? Sounds hard. -// Yes, this is the real password. Please be nice, thanks. -client4.auth("664b1b6aaf134e1ec281945a8de702a9", function (err, res) { - var name = "AUTH_4"; - - if (err) { - assert.fail(err, name); - } - assert.strictEqual("OK", res.toString(), "auth"); -}); - // Exit immediately on connection failure, which triggers "exit", below, which fails the test client.on("error", function (err) { console.error("client: " + err.stack); @@ -1389,11 +1401,6 @@ client3.on("error", function (err) { console.error("client3: " + err.stack); process.exit(); }); -client5.on("error", function (err) { - console.error("client5: " + err.stack); - process.exit(); -}); - client.on("reconnecting", function (params) { console.log("reconnecting: " + util.inspect(params)); });