diff --git a/changelog.md b/changelog.md index 22eb803437..b9d0ff6313 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,25 @@ Changelog ========= +## v0.5.5 - February 16, 2011 + +Add probe for server readiness. + +When a Redis server starts up, it might take a while to load the dataset into memory. +During this time, the server will accept connections, but will return errors for all non-INFO +commands. Now node_redis will send an INFO command whenever it connects to a server. +If the info command indicates that the server is not ready, the client will keep trying until +the server is ready. Once it is ready, the client will emit a "ready" event as well as the +"connect" event. The client will queue up all commands sent before the server is ready, just +like it did before. When the server is ready, all offline/non-ready commands will be replayed. +This should be backward compatible with previous versions. + +To disable this ready check behavior, set `options.no_ready_check` when creating the client. + +As a side effect of this change, the key/val params from the info command are available as +`client.server_options`. Further, the version string is decomposed into individual elements +in `client.server_options.versions`. + ## v0.5.4 - February 11, 2011 Fix excess memory consumption from Queue backing store. diff --git a/index.js b/index.js index 532a855411..f9358fe48d 100644 --- a/index.js +++ b/index.js @@ -39,6 +39,7 @@ function RedisClient(stream, options) { this.options = options || {}; this.connected = false; + this.ready = false; this.connections = 0; this.attempts = 1; this.command_queue = new Queue(); // holds sent commands to de-pipeline them @@ -48,6 +49,7 @@ function RedisClient(stream, options) { this.retry_backoff = 1.7; this.subscriptions = false; this.closing = false; + this.server_info = {}; var parser_module, self = this; @@ -76,22 +78,23 @@ function RedisClient(stream, options) { }); // "reply error" is an error sent back by redis - self.reply_parser.on("reply error", function (reply) { + this.reply_parser.on("reply error", function (reply) { self.return_error(new Error(reply)); }); - self.reply_parser.on("reply", function (reply) { + this.reply_parser.on("reply", function (reply) { self.return_reply(reply); }); // "error" is bad. Somehow the parser got confused. It'll try to reset and continue. - self.reply_parser.on("error", function (err) { + this.reply_parser.on("error", function (err) { self.emit("error", new Error("Redis reply parser error: " + err.stack)); }); this.stream.on("connect", function () { if (exports.debug_mode) { - console.log("Stream connected"); + console.log("Stream connected fd " + self.stream.fd); } self.connected = true; + self.ready = false; self.connections += 1; self.command_queue = new Queue(); self.emitted_end = false; @@ -101,19 +104,14 @@ function RedisClient(stream, options) { self.stream.setNoDelay(); self.stream.setTimeout(0); - // give connect listeners a chance to run first in case they need to auth self.emit("connect"); - var command_obj; - while (self.offline_queue.length > 0) { - command_obj = self.offline_queue.shift(); - if (exports.debug_mode) { - console.log("Sending offline command: " + command_obj.command); - } - self.send_command(command_obj.command, command_obj.args, command_obj.callback); + if (self.options.no_ready_check) { + self.ready = true; + self.send_offline_queue(); + } else { + self.ready_check(); } - self.offline_queue = new Queue(); - // Even though items were shifted off, Queue backing store still uses memory until next add }); this.stream.on("data", function (buffer_from_socket) { @@ -145,6 +143,7 @@ function RedisClient(stream, options) { self.command_queue = new Queue(); self.connected = false; + self.ready = false; self.emit("error", new Error(message)); }); @@ -165,6 +164,76 @@ function RedisClient(stream, options) { util.inherits(RedisClient, events.EventEmitter); exports.RedisClient = RedisClient; +RedisClient.prototype.ready_check = function () { + var self = this; + + function send_info_cmd() { + if (exports.debug_mode) { + console.log("checking server ready state..."); + } + + self.send_anyway = true; // secret flag to send_command to send something even if not "ready" + self.info(function (err, res) { + if (err) { + self.emit("error", "Ready check failed: " + err); + return; + } + + var lines = res.split("\r\n"), obj = {}, retry_time; + + lines.forEach(function (line) { + var parts = line.split(':'); + if (parts[1]) { + obj[parts[0]] = parts[1]; + } + }); + + obj.versions = []; + obj.redis_version.split('.').forEach(function (num) { + obj.versions.push(+num); + }); + + // expose info key/vals to users + self.server_info = obj; + + if (!obj["loading"] || (obj["loading"] && obj["loading"] == 0)) { + if (exports.debug_mode) { + console.log("Redis server ready."); + } + self.ready = true; + + self.send_offline_queue(); + self.emit("ready"); + } else { + retry_time = obj.loading_eta_seconds * 1000; + if (retry_time > 1000) { + retry_time = 1000; + } + if (exports.debug_mode) { + console.log("Redis server still loading, trying again in " + retry_time); + } + setTimeout(send_info_cmd, retry_time); + } + }); + self.send_anyway = false; + } + + send_info_cmd(); +}; + +RedisClient.prototype.send_offline_queue = function () { + var command_obj; + while (this.offline_queue.length > 0) { + command_obj = this.offline_queue.shift(); + if (exports.debug_mode) { + console.log("Sending offline command: " + command_obj.command); + } + this.send_command(command_obj.command, command_obj.args, command_obj.callback); + } + this.offline_queue = new Queue(); + // Even though items were shifted off, Queue backing store still uses memory until next add +}; + RedisClient.prototype.connection_gone = function (why) { var self = this; @@ -180,6 +249,7 @@ RedisClient.prototype.connection_gone = function (why) { console.warn("Redis connection is gone from " + why + " event."); } self.connected = false; + self.ready = false; self.subscriptions = false; // since we are collapsing end and close, users don't expect to be called twice @@ -357,7 +427,7 @@ RedisClient.prototype.send_command = function () { sub_command: false }; - if (! this.connected) { + if (!this.ready && !this.send_anyway) { if (exports.debug_mode) { console.log("Queueing " + command + " for next server connection."); } @@ -447,6 +517,7 @@ RedisClient.prototype.send_command = function () { RedisClient.prototype.end = function () { this.stream._events = {}; this.connected = false; + this.ready = false; return this.stream.end(); }; @@ -616,16 +687,16 @@ RedisClient.prototype.MULTI = function (args) { exports.createClient = function (port_arg, host_arg, options) { var port = port_arg || default_port, host = host_arg || default_host, - red_client, net_client; + redis_client, net_client; net_client = net.createConnection(port, host); - red_client = new RedisClient(net_client, options); + redis_client = new RedisClient(net_client, options); - red_client.port = port; - red_client.host = host; + redis_client.port = port; + redis_client.host = host; - return red_client; + return redis_client; }; exports.print = function (err, reply) { diff --git a/multi_bench.js b/multi_bench.js index 20e46cccf8..35b20db477 100644 --- a/multi_bench.js +++ b/multi_bench.js @@ -8,7 +8,6 @@ var redis = require("./index"), tests = [], test_start, client_options = { - parser: "javascript", return_buffers: false }; diff --git a/package.json b/package.json index 89771956cd..1849030bca 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { "name" : "redis", - "version" : "0.5.4", + "version" : "0.5.5", "description" : "Redis client library", "author": "Matt Ranney ", "contributors": [ diff --git a/test.js b/test.js index 039b06afe9..b6bb734bd9 100644 --- a/test.js +++ b/test.js @@ -1,8 +1,6 @@ /*global require console setTimeout process Buffer */ var redis = require("./index"), - client = redis.createClient(6379, "127.0.0.1", { - parser: "javascript" - }), + client = redis.createClient(), client2 = redis.createClient(), client3 = redis.createClient(), assert = require("assert"), @@ -10,11 +8,10 @@ var redis = require("./index"), test_db_num = 15, // this DB will be flushed and used for testing tests = {}, connected = false, - ended = false, - server_info; + ended = false; -// Uncomment this to see the wire protocol and other debugging info -redis.debug_mode = false; +// Set this to truthy to see the wire protocol and other debugging info +redis.debug_mode = process.argv[2]; function buffers_to_strings(arr) { return arr.map(function (val) { @@ -231,7 +228,7 @@ tests.MULTI_6 = function () { tests.WATCH_MULTI = function () { var name = 'WATCH_MULTI'; - if (server_info.versions[0] >= 2 && server_info.versions[1] >= 1) { + if (client.server_info.versions[0] >= 2 && client.server_info.versions[1] >= 1) { client.watch(name); client.incr(name); var multi = client.multi(); @@ -1057,28 +1054,10 @@ function run_next_test() { console.log("Using reply parser " + client.reply_parser.name); -client.on("connect", function start_tests() { - // remove listener so we don't restart all tests on reconnect - client.removeListener("connect", start_tests); +client.once("ready", function start_tests() { + console.log("Connected to " + client.host + ":" + client.port + ", Redis server version " + client.server_info.redis_version + "\n"); - // Fetch and stash info results in case anybody needs info on the server we are using. - client.info(function (err, reply) { - var obj = {}; - reply.toString().split('\n').forEach(function (line) { - var parts = line.split(':'); - if (parts[1]) { - obj[parts[0]] = parts[1]; - } - }); - obj.versions = []; - obj.redis_version.split('.').forEach(function (num) { - obj.versions.push(+num); - }); - server_info = obj; - console.log("Connected to " + client.host + ":" + client.port + ", Redis server version " + obj.redis_version + "\n"); - - run_next_test(); - }); + run_next_test(); connected = true; });