You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
Improved reconnect logic. Initial version of reconnect thresholds.
This commit is contained in:
215
index.js
215
index.js
@@ -67,36 +67,7 @@ function RedisClient(stream, options) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.stream.on("error", function (msg) {
|
this.stream.on("error", function (msg) {
|
||||||
if (self.closing) {
|
self.on_error(msg.message);
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
var message = "Redis connection to " + self.host + ":" + self.port + " failed - " + msg.message;
|
|
||||||
|
|
||||||
if (exports.debug_mode) {
|
|
||||||
console.warn(message);
|
|
||||||
}
|
|
||||||
self.offline_queue.forEach(function (args) {
|
|
||||||
if (typeof args[2] === "function") {
|
|
||||||
args[2](message);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
self.offline_queue = new Queue();
|
|
||||||
|
|
||||||
self.command_queue.forEach(function (args) {
|
|
||||||
if (typeof args[2] === "function") {
|
|
||||||
args[2](message);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
self.command_queue = new Queue();
|
|
||||||
|
|
||||||
self.connected = false;
|
|
||||||
self.ready = false;
|
|
||||||
|
|
||||||
self.emit("error", new Error(message));
|
|
||||||
// "error" events get turned into exceptions if they aren't listened for. If the user handled this error
|
|
||||||
// then we should try to reconnect.
|
|
||||||
self.connection_gone("error");
|
|
||||||
});
|
});
|
||||||
|
|
||||||
this.stream.on("close", function () {
|
this.stream.on("close", function () {
|
||||||
@@ -117,6 +88,46 @@ function RedisClient(stream, options) {
|
|||||||
util.inherits(RedisClient, events.EventEmitter);
|
util.inherits(RedisClient, events.EventEmitter);
|
||||||
exports.RedisClient = RedisClient;
|
exports.RedisClient = RedisClient;
|
||||||
|
|
||||||
|
RedisClient.prototype.on_error = function (msg) {
|
||||||
|
var message = "Redis connection to " + this.host + ":" + this.port + " failed - " + msg,
|
||||||
|
self = this, command_obj;
|
||||||
|
|
||||||
|
if (this.closing) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (exports.debug_mode) {
|
||||||
|
console.warn(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// send errors to any commands in the offline queue, then reset
|
||||||
|
while (this.offline_queue.length > 0) {
|
||||||
|
command_obj = this.offline_queue.shift();
|
||||||
|
console.dir(command_obj);
|
||||||
|
if (typeof command_obj.callback === "function") {
|
||||||
|
command_obj.callback(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.offline_queue = new Queue();
|
||||||
|
|
||||||
|
// send errors to any commands in the command queue, then reset
|
||||||
|
while (this.command_queue.length > 0) {
|
||||||
|
command_obj = this.command_queue.shift();
|
||||||
|
if (typeof command_obj.callback === "function") {
|
||||||
|
command_obj.callback(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.command_queue = new Queue();
|
||||||
|
|
||||||
|
this.connected = false;
|
||||||
|
this.ready = false;
|
||||||
|
|
||||||
|
this.emit("error", new Error(message));
|
||||||
|
// "error" events get turned into exceptions if they aren't listened for. If the user handled this error
|
||||||
|
// then we should try to reconnect.
|
||||||
|
this.connection_gone("error");
|
||||||
|
};
|
||||||
|
|
||||||
RedisClient.prototype.do_auth = function () {
|
RedisClient.prototype.do_auth = function () {
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|
||||||
@@ -175,7 +186,7 @@ RedisClient.prototype.on_connect = function () {
|
|||||||
this.max_attempts = 0;
|
this.max_attempts = 0;
|
||||||
this.retry_totaltime = 0;
|
this.retry_totaltime = 0;
|
||||||
this.retry_timer = null;
|
this.retry_timer = null;
|
||||||
this.current_retry_delay = this.retry_time;
|
this.current_retry_delay = this.retry_delay;
|
||||||
this.stream.setNoDelay();
|
this.stream.setNoDelay();
|
||||||
this.stream.setTimeout(0);
|
this.stream.setTimeout(0);
|
||||||
|
|
||||||
@@ -235,60 +246,64 @@ RedisClient.prototype.init_parser = function () {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
RedisClient.prototype.on_info_cmd = function (err, res) {
|
||||||
|
var self = this, obj = {}, lines, retry_time;
|
||||||
|
|
||||||
|
if (err) {
|
||||||
|
return self.emit("error", "Ready check failed: " + err);
|
||||||
|
}
|
||||||
|
|
||||||
|
lines = res.toString().split("\r\n");
|
||||||
|
|
||||||
|
lines.forEach(function (line) {
|
||||||
|
var parts = line.split(':');
|
||||||
|
if (parts[1]) {
|
||||||
|
obj[parts[0]] = parts[1];
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
obj.versions = [];
|
||||||
|
obj.redis_version.split('.').forEach(function (num) {
|
||||||
|
obj.versions.push(+num);
|
||||||
|
});
|
||||||
|
|
||||||
|
// expose info key/vals to users
|
||||||
|
this.server_info = obj;
|
||||||
|
|
||||||
|
if (!obj.loading || (obj.loading && obj.loading === "0")) {
|
||||||
|
if (exports.debug_mode) {
|
||||||
|
console.log("Redis server ready.");
|
||||||
|
}
|
||||||
|
this.ready = true;
|
||||||
|
|
||||||
|
this.send_offline_queue();
|
||||||
|
this.emit("ready");
|
||||||
|
} else {
|
||||||
|
retry_time = obj.loading_eta_seconds * 1000;
|
||||||
|
if (retry_time > 1000) {
|
||||||
|
retry_time = 1000;
|
||||||
|
}
|
||||||
|
if (exports.debug_mode) {
|
||||||
|
console.log("Redis server still loading, trying again in " + retry_time);
|
||||||
|
}
|
||||||
|
setTimeout(function () {
|
||||||
|
self.ready_check();
|
||||||
|
}, retry_time);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
RedisClient.prototype.ready_check = function () {
|
RedisClient.prototype.ready_check = function () {
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|
||||||
function send_info_cmd() {
|
if (exports.debug_mode) {
|
||||||
if (exports.debug_mode) {
|
console.log("checking server ready state...");
|
||||||
console.log("checking server ready state...");
|
|
||||||
}
|
|
||||||
|
|
||||||
self.send_anyway = true; // secret flag to send_command to send something even if not "ready"
|
|
||||||
self.info(function (err, res) {
|
|
||||||
if (err) {
|
|
||||||
return self.emit("error", "Ready check failed: " + err);
|
|
||||||
}
|
|
||||||
|
|
||||||
var lines = res.toString().split("\r\n"), obj = {}, retry_time;
|
|
||||||
|
|
||||||
lines.forEach(function (line) {
|
|
||||||
var parts = line.split(':');
|
|
||||||
if (parts[1]) {
|
|
||||||
obj[parts[0]] = parts[1];
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
obj.versions = [];
|
|
||||||
obj.redis_version.split('.').forEach(function (num) {
|
|
||||||
obj.versions.push(+num);
|
|
||||||
});
|
|
||||||
|
|
||||||
// expose info key/vals to users
|
|
||||||
self.server_info = obj;
|
|
||||||
|
|
||||||
if (!obj.loading || (obj.loading && obj.loading === "0")) {
|
|
||||||
if (exports.debug_mode) {
|
|
||||||
console.log("Redis server ready.");
|
|
||||||
}
|
|
||||||
self.ready = true;
|
|
||||||
|
|
||||||
self.send_offline_queue();
|
|
||||||
self.emit("ready");
|
|
||||||
} else {
|
|
||||||
retry_time = obj.loading_eta_seconds * 1000;
|
|
||||||
if (retry_time > 1000) {
|
|
||||||
retry_time = 1000;
|
|
||||||
}
|
|
||||||
if (exports.debug_mode) {
|
|
||||||
console.log("Redis server still loading, trying again in " + retry_time);
|
|
||||||
}
|
|
||||||
setTimeout(send_info_cmd, retry_time);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
self.send_anyway = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
send_info_cmd();
|
this.send_anyway = true; // secret flag to send_command to send something even if not "ready"
|
||||||
|
this.info(function (err, res) {
|
||||||
|
self.on_info_cmd(err, res);
|
||||||
|
});
|
||||||
|
this.send_anyway = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.send_offline_queue = function () {
|
RedisClient.prototype.send_offline_queue = function () {
|
||||||
@@ -310,7 +325,7 @@ RedisClient.prototype.send_offline_queue = function () {
|
|||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.connection_gone = function (why) {
|
RedisClient.prototype.connection_gone = function (why) {
|
||||||
var self = this;
|
var self = this, message;
|
||||||
|
|
||||||
// If a retry is already in progress, just let that happen
|
// If a retry is already in progress, just let that happen
|
||||||
if (this.retry_timer) {
|
if (this.retry_timer) {
|
||||||
@@ -335,8 +350,8 @@ RedisClient.prototype.connection_gone = function (why) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.command_queue.forEach(function (args) {
|
this.command_queue.forEach(function (args) {
|
||||||
if (typeof args[2] === "function") {
|
if (typeof args.callback === "function") {
|
||||||
args[2]("Server connection closed");
|
args.callback("Server connection closed");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
this.command_queue = new Queue();
|
this.command_queue = new Queue();
|
||||||
@@ -347,17 +362,16 @@ RedisClient.prototype.connection_gone = function (why) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.current_retry_delay = this.retry_delay * this.retry_backoff;
|
this.current_retry_delay = this.current_retry_delay * this.retry_backoff;
|
||||||
|
|
||||||
if (exports.debug_mode) {
|
if (exports.debug_mode) {
|
||||||
console.log("Retry connection in " + this.current_retry_delay + " ms");
|
console.log("Retry connection in " + this.current_retry_delay + " ms");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (self.max_attempts && self.attempts >= self.max_attempts) {
|
if (this.max_attempts && this.attempts >= this.max_attempts) {
|
||||||
self.retry_timer = null;
|
this.retry_timer = null;
|
||||||
if (exports.debug_mode) {
|
// TODO - engage Redis is Broken mode for future commands
|
||||||
console.log("Aborting connection attempt: Max attempts " + self.max_attempts + " failed.");
|
this.emit("not_reconnecting", "Aborting connection attempt: Max attempts " + this.max_attempts + " failed.");
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -371,14 +385,12 @@ RedisClient.prototype.connection_gone = function (why) {
|
|||||||
console.log("Retrying connection...");
|
console.log("Retrying connection...");
|
||||||
}
|
}
|
||||||
|
|
||||||
self.retry_delay = self.retry_delay * self.retry_backoff;
|
self.retry_totaltime += self.current_retry_delay;
|
||||||
self.retry_totaltime += self.retry_delay;
|
|
||||||
|
|
||||||
if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) {
|
if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) {
|
||||||
self.retry_timer = null;
|
self.retry_timer = null;
|
||||||
if (exports.debug_mode) {
|
// TODO - engage Redis is Broken mode for future commands
|
||||||
console.log("Aborting connection attempt: Total timeout of " + self.connect_timeout + "ms exceeded.");
|
self.emit("not_reconnecting", "Aborting connection attempt: Total timeout of " + self.connect_timeout + "ms exceeded.");
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -435,7 +447,7 @@ RedisClient.prototype.return_error = function (err) {
|
|||||||
|
|
||||||
RedisClient.prototype.return_reply = function (reply) {
|
RedisClient.prototype.return_reply = function (reply) {
|
||||||
var command_obj = this.command_queue.shift(),
|
var command_obj = this.command_queue.shift(),
|
||||||
obj, i, len, key, val, type, timestamp, args, queue_len = this.command_queue.getLength();
|
obj, i, len, key, val, type, timestamp, argindex, args, queue_len = this.command_queue.getLength();
|
||||||
|
|
||||||
if (this.subscriptions === false && queue_len === 0) {
|
if (this.subscriptions === false && queue_len === 0) {
|
||||||
this.emit("idle");
|
this.emit("idle");
|
||||||
@@ -495,9 +507,9 @@ RedisClient.prototype.return_reply = function (reply) {
|
|||||||
} else if (this.monitoring) {
|
} else if (this.monitoring) {
|
||||||
len = reply.indexOf(" ");
|
len = reply.indexOf(" ");
|
||||||
timestamp = reply.slice(0, len);
|
timestamp = reply.slice(0, len);
|
||||||
// TODO - this de-quoting doesn't work correctly if you put JSON strings in your values.
|
argindex = reply.indexOf('"');
|
||||||
args = reply.slice(len + 1).match(/"[^"]+"/g).map(function (elem) {
|
args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) {
|
||||||
return elem.replace(/"/g, "");
|
return elem.replace(/\\"/g, '"');
|
||||||
});
|
});
|
||||||
this.emit("monitor", timestamp, args);
|
this.emit("monitor", timestamp, args);
|
||||||
} else {
|
} else {
|
||||||
@@ -505,7 +517,8 @@ RedisClient.prototype.return_reply = function (reply) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// This Command constructor is ever so slightly faster than using an object literal
|
// This Command constructor is ever so slightly faster than using an object literal, but more importantly, using
|
||||||
|
// a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
|
||||||
function Command(command, args, sub_command, callback) {
|
function Command(command, args, sub_command, callback) {
|
||||||
this.command = command;
|
this.command = command;
|
||||||
this.args = args;
|
this.args = args;
|
||||||
|
4
test.js
4
test.js
@@ -348,7 +348,7 @@ tests.reconnect = function () {
|
|||||||
// For orderly shutdown in normal programs, do client.quit()
|
// For orderly shutdown in normal programs, do client.quit()
|
||||||
client.stream.destroy();
|
client.stream.destroy();
|
||||||
});
|
});
|
||||||
|
|
||||||
client.on("reconnecting", function on_recon(params) {
|
client.on("reconnecting", function on_recon(params) {
|
||||||
client.on("connect", function on_connect() {
|
client.on("connect", function on_connect() {
|
||||||
client.select(test_db_num, require_string("OK", name));
|
client.select(test_db_num, require_string("OK", name));
|
||||||
@@ -1233,7 +1233,7 @@ client3.on("error", function (err) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
client.on("reconnecting", function (params) {
|
client.on("reconnecting", function (params) {
|
||||||
// console.log("reconnecting: " + util.inspect(params));
|
console.log("reconnecting: " + util.inspect(params));
|
||||||
});
|
});
|
||||||
|
|
||||||
process.on('uncaughtException', function (err) {
|
process.on('uncaughtException', function (err) {
|
||||||
|
@@ -1,4 +1,6 @@
|
|||||||
var redis = require("redis").createClient();
|
var redis = require("../index").createClient(null, null, {
|
||||||
|
max_attempts: 2
|
||||||
|
});
|
||||||
|
|
||||||
redis.on("error", function (err) {
|
redis.on("error", function (err) {
|
||||||
console.log("Redis says: " + err);
|
console.log("Redis says: " + err);
|
||||||
@@ -11,6 +13,9 @@ redis.on("ready", function () {
|
|||||||
redis.on("reconnecting", function (arg) {
|
redis.on("reconnecting", function (arg) {
|
||||||
console.log("Redis reconnecting: " + JSON.stringify(arg));
|
console.log("Redis reconnecting: " + JSON.stringify(arg));
|
||||||
});
|
});
|
||||||
|
redis.on("not_reconnecting", function (arg) {
|
||||||
|
console.log("Redis NOT reconnecting: " + arg);
|
||||||
|
});
|
||||||
redis.on("connect", function () {
|
redis.on("connect", function () {
|
||||||
console.log("Redis connected.");
|
console.log("Redis connected.");
|
||||||
});
|
});
|
||||||
@@ -24,4 +29,4 @@ setInterval(function () {
|
|||||||
console.log(now + " Redis reply: " + res);
|
console.log(now + " Redis reply: " + res);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}, 200);
|
}, 100);
|
||||||
|
Reference in New Issue
Block a user