1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Lots of bugs fixed.

*  connection error did not properly trigger reconnection logic [GH-85]
*  client.hmget(key, [val1, val2]) was not expanding properly [GH-66]
*  client.quit() while in pub/sub mode would throw an error [GH-87]
*  client.multi(['hmset', 'key', {foo: 'bar'}]) fails [GH-92]
This commit is contained in:
Matt Ranney
2011-04-21 16:48:14 -10:00
parent f624fa6234
commit f10ff9e916
11 changed files with 161 additions and 59 deletions

View File

@@ -3,6 +3,7 @@
var net = require("net"),
util = require("./lib/util").util,
Queue = require("./lib/queue").Queue,
to_array = require("./lib/to_array"),
events = require("events"),
parsers = [],
default_port = 6379,
@@ -23,17 +24,6 @@ try {
parsers.push(require("./lib/parser/javascript"));
function to_array(args) {
var len = args.length,
arr = new Array(len), i;
for (i = 0; i < len; i += 1) {
arr[i] = args[i];
}
return arr;
}
function RedisClient(stream, options) {
this.stream = stream;
this.options = options || {};
@@ -45,8 +35,9 @@ function RedisClient(stream, options) {
this.command_queue = new Queue(); // holds sent commands to de-pipeline them
this.offline_queue = new Queue(); // holds commands issued but not able to be sent
this.commands_sent = 0;
this.retry_delay = 250;
this.retry_backoff = 1.7;
this.retry_delay = 250; // inital reconnection delay
this.current_retry_delay = this.retry_delay;
this.retry_backoff = 1.7; // each retry waits current delay * retry_backoff
this.subscriptions = false;
this.monitoring = false;
this.closing = false;
@@ -79,7 +70,7 @@ function RedisClient(stream, options) {
return_buffers: self.options.return_buffers || false
});
// "reply error" is an error sent back by redis
// "reply error" is an error sent back by Redis
this.reply_parser.on("reply error", function (reply) {
self.return_error(new Error(reply));
});
@@ -103,7 +94,7 @@ function RedisClient(stream, options) {
if (this.closing) {
return;
}
var message = "Redis connection to " + self.host + ":" + self.port + " failed - " + msg.message;
if (exports.debug_mode) {
@@ -122,10 +113,14 @@ function RedisClient(stream, options) {
}
});
self.command_queue = new Queue();
self.connected = false;
self.ready = false;
self.emit("error", new Error(message));
// "error" events get turned into exceptions if they aren't listened for. If the user handled this error
// then we should try to reconnect.
self.connection_gone("error");
});
this.stream.on("close", function () {
@@ -153,11 +148,12 @@ RedisClient.prototype.on_connect = function () {
this.connected = true;
this.ready = false;
this.attempts = 0;
this.connections += 1;
this.command_queue = new Queue();
this.emitted_end = false;
this.retry_timer = null;
this.retry_delay = 250;
this.current_retry_delay = this.retry_time;
this.stream.setNoDelay();
this.stream.setTimeout(0);
@@ -225,7 +221,7 @@ RedisClient.prototype.ready_check = function () {
// expose info key/vals to users
self.server_info = obj;
if (!obj["loading"] || (obj["loading"] && obj["loading"] == 0)) {
if (!obj.loading || (obj.loading && obj.loading === "0")) {
if (exports.debug_mode) {
console.log("Redis server ready.");
}
@@ -267,56 +263,57 @@ RedisClient.prototype.connection_gone = function (why) {
var self = this;
// If a retry is already in progress, just let that happen
if (self.retry_timer) {
if (this.retry_timer) {
return;
}
// Note that this may trigger another "close" or "end" event
self.stream.destroy();
this.stream.destroy();
if (exports.debug_mode) {
console.warn("Redis connection is gone from " + why + " event.");
}
self.connected = false;
self.ready = false;
self.subscriptions = false;
self.monitoring = false;
this.connected = false;
this.ready = false;
this.subscriptions = false;
this.monitoring = false;
// since we are collapsing end and close, users don't expect to be called twice
if (! self.emitted_end) {
self.emit("end");
self.emitted_end = true;
if (! this.emitted_end) {
this.emit("end");
this.emitted_end = true;
}
self.command_queue.forEach(function (args) {
this.command_queue.forEach(function (args) {
if (typeof args[2] === "function") {
args[2]("Server connection closed");
}
});
self.command_queue = new Queue();
this.command_queue = new Queue();
// If this is a requested shutdown, then don't retry
if (self.closing) {
self.retry_timer = null;
if (this.closing) {
this.retry_timer = null;
return;
}
this.current_retry_delay = this.retry_delay * this.retry_backoff;
if (exports.debug_mode) {
console.log("Retry connection in " + self.retry_delay + " ms");
console.log("Retry connection in " + this.current_retry_delay + " ms");
}
self.attempts += 1;
self.emit("reconnecting", {
delay: self.retry_delay,
attempt: self.attempts
this.attempts += 1;
this.emit("reconnecting", {
delay: this.current_retry_delay,
attempt: this.attempts
});
self.retry_timer = setTimeout(function () {
this.retry_timer = setTimeout(function () {
if (exports.debug_mode) {
console.log("Retrying connection...");
}
self.retry_delay = self.retry_delay * self.retry_backoff;
self.stream.connect(self.port, self.host);
self.retry_timer = null;
}, self.retry_delay);
}, this.current_retry_delay);
};
RedisClient.prototype.on_data = function (data) {
@@ -369,7 +366,7 @@ RedisClient.prototype.return_reply = function (reply) {
this.emit("idle");
this.command_queue = new Queue();
}
if (command_obj && !command_obj.sub_command) {
if (typeof command_obj.callback === "function") {
// HGETALL special case replies with keyed Buffers
@@ -394,7 +391,7 @@ RedisClient.prototype.return_reply = function (reply) {
} else if (exports.debug_mode) {
console.log("no callback for reply: " + (reply && reply.toString && reply.toString()));
}
} else if (this.subscriptions) {
} else if (this.subscriptions || command_obj.sub_command) {
if (Array.isArray(reply)) {
type = reply[0].toString();
@@ -413,7 +410,7 @@ RedisClient.prototype.return_reply = function (reply) {
} else {
throw new Error("subscriptions are active but got unknown reply type " + type);
}
} else {
} else if (! this.closing) {
throw new Error("subscriptions are active but got an invalid reply: " + reply);
}
} else if (this.monitoring) {
@@ -457,6 +454,10 @@ RedisClient.prototype.send_command = function () {
}
}
if (args.length === 2 && Array.isArray(args[1])) {
args = [args[0]].concat(args[1]);
}
command_obj = {
command: command,
args: args,
@@ -573,7 +574,8 @@ function Multi(client, args) {
// string commands
"get", "set", "setnx", "setex", "append", "substr", "strlen", "del", "exists", "incr", "decr", "mget",
// list commands
"rpush", "lpush", "rpushx", "lpushx", "linsert", "rpop", "lpop", "brpop", "blpop", "llen", "lindex", "lset", "lrange", "ltrim", "lrem", "rpoplpush",
"rpush", "lpush", "rpushx", "lpushx", "linsert", "rpop", "lpop", "brpop", "blpop", "brpoplpush", "llen", "lindex", "lset", "lrange",
"ltrim", "lrem", "rpoplpush",
// set commands
"sadd", "srem", "smove", "sismember", "scard", "spop", "srandmember", "sinter", "sinterstore", "sunion", "sunionstore", "sdiff", "sdiffstore", "smembers",
// sorted set commands
@@ -668,7 +670,7 @@ Multi.prototype.exec = function (callback) {
// drain queue, callback will catch "QUEUED" or error
// Can't use a for loop here, as we need closure around the index.
this.queue.forEach(function (args, index) {
var command = args[0];
var command = args[0], obj;
if (typeof args[args.length - 1] === "function") {
args = args.slice(1, -1);
} else {
@@ -677,6 +679,13 @@ Multi.prototype.exec = function (callback) {
if (args.length === 1 && Array.isArray(args[0])) {
args = args[0];
}
if (command === 'hmset' && typeof args[1] === 'object') {
obj = args.pop();
Object.keys(obj).forEach(function (key) {
args.push(key);
args.push(obj[key]);
});
}
this.client.send_command(command, args, function (err, reply) {
if (err) {
var cur = self.queue[index];