From ee93d1b91b40d84fffddb41ac4712a59cfdf0d4b Mon Sep 17 00:00:00 2001 From: Matt Ranney Date: Wed, 16 Feb 2011 15:46:27 -1000 Subject: [PATCH] Add probe for server readiness. When a Redis server starts up, it might take a while to load the dataset into memory. During this time, the server will accept connections, but will return errors for all non-INFO commands. Now node_redis will send an INFO command whenever it connects to a server. If the info command indicates that the server is not ready, the client will keep trying until the server is ready. Once it is ready, the client will emit a "ready" event as well as the "connect" event. The client will queue up all commands sent before the server is ready, just like it did before. When the server is ready, all offline/non-ready commands will be replayed. This should be backward compatible with previous versions. To disable this ready check behavior, set `options.no_ready_check` when creating the client. As a side effect of this change, the key/val params from the info command are available as `client.server_options`. Further, the version string is decomposed into individual elements in `client.server_options.versions`. --- changelog.md | 19 +++++++++ index.js | 111 ++++++++++++++++++++++++++++++++++++++++--------- multi_bench.js | 1 - package.json | 2 +- test.js | 37 ++++------------- 5 files changed, 119 insertions(+), 51 deletions(-) diff --git a/changelog.md b/changelog.md index 22eb803437..b9d0ff6313 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,25 @@ Changelog ========= +## v0.5.5 - February 16, 2011 + +Add probe for server readiness. + +When a Redis server starts up, it might take a while to load the dataset into memory. +During this time, the server will accept connections, but will return errors for all non-INFO +commands. Now node_redis will send an INFO command whenever it connects to a server. +If the info command indicates that the server is not ready, the client will keep trying until +the server is ready. Once it is ready, the client will emit a "ready" event as well as the +"connect" event. The client will queue up all commands sent before the server is ready, just +like it did before. When the server is ready, all offline/non-ready commands will be replayed. +This should be backward compatible with previous versions. + +To disable this ready check behavior, set `options.no_ready_check` when creating the client. + +As a side effect of this change, the key/val params from the info command are available as +`client.server_options`. Further, the version string is decomposed into individual elements +in `client.server_options.versions`. + ## v0.5.4 - February 11, 2011 Fix excess memory consumption from Queue backing store. diff --git a/index.js b/index.js index 532a855411..f9358fe48d 100644 --- a/index.js +++ b/index.js @@ -39,6 +39,7 @@ function RedisClient(stream, options) { this.options = options || {}; this.connected = false; + this.ready = false; this.connections = 0; this.attempts = 1; this.command_queue = new Queue(); // holds sent commands to de-pipeline them @@ -48,6 +49,7 @@ function RedisClient(stream, options) { this.retry_backoff = 1.7; this.subscriptions = false; this.closing = false; + this.server_info = {}; var parser_module, self = this; @@ -76,22 +78,23 @@ function RedisClient(stream, options) { }); // "reply error" is an error sent back by redis - self.reply_parser.on("reply error", function (reply) { + this.reply_parser.on("reply error", function (reply) { self.return_error(new Error(reply)); }); - self.reply_parser.on("reply", function (reply) { + this.reply_parser.on("reply", function (reply) { self.return_reply(reply); }); // "error" is bad. Somehow the parser got confused. It'll try to reset and continue. - self.reply_parser.on("error", function (err) { + this.reply_parser.on("error", function (err) { self.emit("error", new Error("Redis reply parser error: " + err.stack)); }); this.stream.on("connect", function () { if (exports.debug_mode) { - console.log("Stream connected"); + console.log("Stream connected fd " + self.stream.fd); } self.connected = true; + self.ready = false; self.connections += 1; self.command_queue = new Queue(); self.emitted_end = false; @@ -101,19 +104,14 @@ function RedisClient(stream, options) { self.stream.setNoDelay(); self.stream.setTimeout(0); - // give connect listeners a chance to run first in case they need to auth self.emit("connect"); - var command_obj; - while (self.offline_queue.length > 0) { - command_obj = self.offline_queue.shift(); - if (exports.debug_mode) { - console.log("Sending offline command: " + command_obj.command); - } - self.send_command(command_obj.command, command_obj.args, command_obj.callback); + if (self.options.no_ready_check) { + self.ready = true; + self.send_offline_queue(); + } else { + self.ready_check(); } - self.offline_queue = new Queue(); - // Even though items were shifted off, Queue backing store still uses memory until next add }); this.stream.on("data", function (buffer_from_socket) { @@ -145,6 +143,7 @@ function RedisClient(stream, options) { self.command_queue = new Queue(); self.connected = false; + self.ready = false; self.emit("error", new Error(message)); }); @@ -165,6 +164,76 @@ function RedisClient(stream, options) { util.inherits(RedisClient, events.EventEmitter); exports.RedisClient = RedisClient; +RedisClient.prototype.ready_check = function () { + var self = this; + + function send_info_cmd() { + if (exports.debug_mode) { + console.log("checking server ready state..."); + } + + self.send_anyway = true; // secret flag to send_command to send something even if not "ready" + self.info(function (err, res) { + if (err) { + self.emit("error", "Ready check failed: " + err); + return; + } + + var lines = res.split("\r\n"), obj = {}, retry_time; + + lines.forEach(function (line) { + var parts = line.split(':'); + if (parts[1]) { + obj[parts[0]] = parts[1]; + } + }); + + obj.versions = []; + obj.redis_version.split('.').forEach(function (num) { + obj.versions.push(+num); + }); + + // expose info key/vals to users + self.server_info = obj; + + if (!obj["loading"] || (obj["loading"] && obj["loading"] == 0)) { + if (exports.debug_mode) { + console.log("Redis server ready."); + } + self.ready = true; + + self.send_offline_queue(); + self.emit("ready"); + } else { + retry_time = obj.loading_eta_seconds * 1000; + if (retry_time > 1000) { + retry_time = 1000; + } + if (exports.debug_mode) { + console.log("Redis server still loading, trying again in " + retry_time); + } + setTimeout(send_info_cmd, retry_time); + } + }); + self.send_anyway = false; + } + + send_info_cmd(); +}; + +RedisClient.prototype.send_offline_queue = function () { + var command_obj; + while (this.offline_queue.length > 0) { + command_obj = this.offline_queue.shift(); + if (exports.debug_mode) { + console.log("Sending offline command: " + command_obj.command); + } + this.send_command(command_obj.command, command_obj.args, command_obj.callback); + } + this.offline_queue = new Queue(); + // Even though items were shifted off, Queue backing store still uses memory until next add +}; + RedisClient.prototype.connection_gone = function (why) { var self = this; @@ -180,6 +249,7 @@ RedisClient.prototype.connection_gone = function (why) { console.warn("Redis connection is gone from " + why + " event."); } self.connected = false; + self.ready = false; self.subscriptions = false; // since we are collapsing end and close, users don't expect to be called twice @@ -357,7 +427,7 @@ RedisClient.prototype.send_command = function () { sub_command: false }; - if (! this.connected) { + if (!this.ready && !this.send_anyway) { if (exports.debug_mode) { console.log("Queueing " + command + " for next server connection."); } @@ -447,6 +517,7 @@ RedisClient.prototype.send_command = function () { RedisClient.prototype.end = function () { this.stream._events = {}; this.connected = false; + this.ready = false; return this.stream.end(); }; @@ -616,16 +687,16 @@ RedisClient.prototype.MULTI = function (args) { exports.createClient = function (port_arg, host_arg, options) { var port = port_arg || default_port, host = host_arg || default_host, - red_client, net_client; + redis_client, net_client; net_client = net.createConnection(port, host); - red_client = new RedisClient(net_client, options); + redis_client = new RedisClient(net_client, options); - red_client.port = port; - red_client.host = host; + redis_client.port = port; + redis_client.host = host; - return red_client; + return redis_client; }; exports.print = function (err, reply) { diff --git a/multi_bench.js b/multi_bench.js index 20e46cccf8..35b20db477 100644 --- a/multi_bench.js +++ b/multi_bench.js @@ -8,7 +8,6 @@ var redis = require("./index"), tests = [], test_start, client_options = { - parser: "javascript", return_buffers: false }; diff --git a/package.json b/package.json index 89771956cd..1849030bca 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { "name" : "redis", - "version" : "0.5.4", + "version" : "0.5.5", "description" : "Redis client library", "author": "Matt Ranney ", "contributors": [ diff --git a/test.js b/test.js index 039b06afe9..b6bb734bd9 100644 --- a/test.js +++ b/test.js @@ -1,8 +1,6 @@ /*global require console setTimeout process Buffer */ var redis = require("./index"), - client = redis.createClient(6379, "127.0.0.1", { - parser: "javascript" - }), + client = redis.createClient(), client2 = redis.createClient(), client3 = redis.createClient(), assert = require("assert"), @@ -10,11 +8,10 @@ var redis = require("./index"), test_db_num = 15, // this DB will be flushed and used for testing tests = {}, connected = false, - ended = false, - server_info; + ended = false; -// Uncomment this to see the wire protocol and other debugging info -redis.debug_mode = false; +// Set this to truthy to see the wire protocol and other debugging info +redis.debug_mode = process.argv[2]; function buffers_to_strings(arr) { return arr.map(function (val) { @@ -231,7 +228,7 @@ tests.MULTI_6 = function () { tests.WATCH_MULTI = function () { var name = 'WATCH_MULTI'; - if (server_info.versions[0] >= 2 && server_info.versions[1] >= 1) { + if (client.server_info.versions[0] >= 2 && client.server_info.versions[1] >= 1) { client.watch(name); client.incr(name); var multi = client.multi(); @@ -1057,28 +1054,10 @@ function run_next_test() { console.log("Using reply parser " + client.reply_parser.name); -client.on("connect", function start_tests() { - // remove listener so we don't restart all tests on reconnect - client.removeListener("connect", start_tests); +client.once("ready", function start_tests() { + console.log("Connected to " + client.host + ":" + client.port + ", Redis server version " + client.server_info.redis_version + "\n"); - // Fetch and stash info results in case anybody needs info on the server we are using. - client.info(function (err, reply) { - var obj = {}; - reply.toString().split('\n').forEach(function (line) { - var parts = line.split(':'); - if (parts[1]) { - obj[parts[0]] = parts[1]; - } - }); - obj.versions = []; - obj.redis_version.split('.').forEach(function (num) { - obj.versions.push(+num); - }); - server_info = obj; - console.log("Connected to " + client.host + ":" + client.port + ", Redis server version " + obj.redis_version + "\n"); - - run_next_test(); - }); + run_next_test(); connected = true; });