From f10ff9e916e535e4997ac150dad7f951c5278bfc Mon Sep 17 00:00:00 2001 From: Matt Ranney Date: Thu, 21 Apr 2011 16:48:14 -1000 Subject: [PATCH] Lots of bugs fixed. * connection error did not properly trigger reconnection logic [GH-85] * client.hmget(key, [val1, val2]) was not expanding properly [GH-66] * client.quit() while in pub/sub mode would throw an error [GH-87] * client.multi(['hmset', 'key', {foo: 'bar'}]) fails [GH-92] --- README.md | 1 + changelog.md | 9 ++++ examples/mget.js | 5 +++ examples/subquery.js | 19 ++++++++ index.js | 97 ++++++++++++++++++++++------------------- lib/queue.js | 15 ++----- lib/to_array.js | 13 ++++++ package.json | 5 ++- test.js | 11 ++++- tests/reconnect_test.js | 27 ++++++++++++ tests/sub_quit_test.js | 18 ++++++++ 11 files changed, 161 insertions(+), 59 deletions(-) create mode 100644 examples/mget.js create mode 100644 examples/subquery.js create mode 100644 lib/to_array.js create mode 100644 tests/reconnect_test.js create mode 100644 tests/sub_quit_test.js diff --git a/README.md b/README.md index f448d0e8ef..4cafdf0a5f 100644 --- a/README.md +++ b/README.md @@ -537,6 +537,7 @@ In order of first contribution, they are: * [Aivo Paas](http://github.com/aivopaas) * [Paul Carey](https://github.com/paulcarey) * [Pieter Noordhuis](https://github.com/pietern) +* [Vladimir Dronnikov](https://github.com/dvv) Thanks. diff --git a/changelog.md b/changelog.md index 697727faa2..2a1707e2e9 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,15 @@ Changelog ========= +## v0.6.0 - April 21, 2011 + +Lots of bugs fixed. + +* connection error did not properly trigger reconnection logic [GH-85] +* client.hmget(key, [val1, val2]) was not expanding properly [GH-66] +* client.quit() while in pub/sub mode would throw an error [GH-87] +* client.multi(['hmset', 'key', {foo: 'bar'}]) fails [GH-92] + ## v0.5.11 - April 7, 2011 Added DISCARD diff --git a/examples/mget.js b/examples/mget.js new file mode 100644 index 0000000000..936740d32f --- /dev/null +++ b/examples/mget.js @@ -0,0 +1,5 @@ +var client = require("redis").createClient(); + +client.mget(["sessions started", "sessions started", "foo"], function (err, res) { + console.dir(res); +}); \ No newline at end of file diff --git a/examples/subquery.js b/examples/subquery.js new file mode 100644 index 0000000000..861657e1f3 --- /dev/null +++ b/examples/subquery.js @@ -0,0 +1,19 @@ +var client = require("redis").createClient(); + +function print_results(obj) { + console.dir(obj); +} + +// build a map of all keys and their types +client.keys("*", function (err, all_keys) { + var key_types = {}; + + all_keys.forEach(function (key, pos) { // use second arg of forEach to get pos + client.type(key, function (err, type) { + key_types[key] = type; + if (pos === all_keys.length - 1) { // callbacks all run in order + print_results(key_types); + } + }); + }); +}); diff --git a/index.js b/index.js index 65cb02584b..c4f8611c22 100644 --- a/index.js +++ b/index.js @@ -3,6 +3,7 @@ var net = require("net"), util = require("./lib/util").util, Queue = require("./lib/queue").Queue, + to_array = require("./lib/to_array"), events = require("events"), parsers = [], default_port = 6379, @@ -23,17 +24,6 @@ try { parsers.push(require("./lib/parser/javascript")); -function to_array(args) { - var len = args.length, - arr = new Array(len), i; - - for (i = 0; i < len; i += 1) { - arr[i] = args[i]; - } - - return arr; -} - function RedisClient(stream, options) { this.stream = stream; this.options = options || {}; @@ -45,8 +35,9 @@ function RedisClient(stream, options) { this.command_queue = new Queue(); // holds sent commands to de-pipeline them this.offline_queue = new Queue(); // holds commands issued but not able to be sent this.commands_sent = 0; - this.retry_delay = 250; - this.retry_backoff = 1.7; + this.retry_delay = 250; // inital reconnection delay + this.current_retry_delay = this.retry_delay; + this.retry_backoff = 1.7; // each retry waits current delay * retry_backoff this.subscriptions = false; this.monitoring = false; this.closing = false; @@ -79,7 +70,7 @@ function RedisClient(stream, options) { return_buffers: self.options.return_buffers || false }); - // "reply error" is an error sent back by redis + // "reply error" is an error sent back by Redis this.reply_parser.on("reply error", function (reply) { self.return_error(new Error(reply)); }); @@ -103,7 +94,7 @@ function RedisClient(stream, options) { if (this.closing) { return; } - + var message = "Redis connection to " + self.host + ":" + self.port + " failed - " + msg.message; if (exports.debug_mode) { @@ -122,10 +113,14 @@ function RedisClient(stream, options) { } }); self.command_queue = new Queue(); - + self.connected = false; self.ready = false; + self.emit("error", new Error(message)); + // "error" events get turned into exceptions if they aren't listened for. If the user handled this error + // then we should try to reconnect. + self.connection_gone("error"); }); this.stream.on("close", function () { @@ -153,11 +148,12 @@ RedisClient.prototype.on_connect = function () { this.connected = true; this.ready = false; + this.attempts = 0; this.connections += 1; this.command_queue = new Queue(); this.emitted_end = false; this.retry_timer = null; - this.retry_delay = 250; + this.current_retry_delay = this.retry_time; this.stream.setNoDelay(); this.stream.setTimeout(0); @@ -225,7 +221,7 @@ RedisClient.prototype.ready_check = function () { // expose info key/vals to users self.server_info = obj; - if (!obj["loading"] || (obj["loading"] && obj["loading"] == 0)) { + if (!obj.loading || (obj.loading && obj.loading === "0")) { if (exports.debug_mode) { console.log("Redis server ready."); } @@ -267,56 +263,57 @@ RedisClient.prototype.connection_gone = function (why) { var self = this; // If a retry is already in progress, just let that happen - if (self.retry_timer) { + if (this.retry_timer) { return; } // Note that this may trigger another "close" or "end" event - self.stream.destroy(); + this.stream.destroy(); if (exports.debug_mode) { console.warn("Redis connection is gone from " + why + " event."); } - self.connected = false; - self.ready = false; - self.subscriptions = false; - self.monitoring = false; + this.connected = false; + this.ready = false; + this.subscriptions = false; + this.monitoring = false; // since we are collapsing end and close, users don't expect to be called twice - if (! self.emitted_end) { - self.emit("end"); - self.emitted_end = true; + if (! this.emitted_end) { + this.emit("end"); + this.emitted_end = true; } - self.command_queue.forEach(function (args) { + this.command_queue.forEach(function (args) { if (typeof args[2] === "function") { args[2]("Server connection closed"); } }); - self.command_queue = new Queue(); + this.command_queue = new Queue(); // If this is a requested shutdown, then don't retry - if (self.closing) { - self.retry_timer = null; + if (this.closing) { + this.retry_timer = null; return; } + + this.current_retry_delay = this.retry_delay * this.retry_backoff; if (exports.debug_mode) { - console.log("Retry connection in " + self.retry_delay + " ms"); + console.log("Retry connection in " + this.current_retry_delay + " ms"); } - self.attempts += 1; - self.emit("reconnecting", { - delay: self.retry_delay, - attempt: self.attempts + this.attempts += 1; + this.emit("reconnecting", { + delay: this.current_retry_delay, + attempt: this.attempts }); - self.retry_timer = setTimeout(function () { + this.retry_timer = setTimeout(function () { if (exports.debug_mode) { console.log("Retrying connection..."); } - self.retry_delay = self.retry_delay * self.retry_backoff; self.stream.connect(self.port, self.host); self.retry_timer = null; - }, self.retry_delay); + }, this.current_retry_delay); }; RedisClient.prototype.on_data = function (data) { @@ -369,7 +366,7 @@ RedisClient.prototype.return_reply = function (reply) { this.emit("idle"); this.command_queue = new Queue(); } - + if (command_obj && !command_obj.sub_command) { if (typeof command_obj.callback === "function") { // HGETALL special case replies with keyed Buffers @@ -394,7 +391,7 @@ RedisClient.prototype.return_reply = function (reply) { } else if (exports.debug_mode) { console.log("no callback for reply: " + (reply && reply.toString && reply.toString())); } - } else if (this.subscriptions) { + } else if (this.subscriptions || command_obj.sub_command) { if (Array.isArray(reply)) { type = reply[0].toString(); @@ -413,7 +410,7 @@ RedisClient.prototype.return_reply = function (reply) { } else { throw new Error("subscriptions are active but got unknown reply type " + type); } - } else { + } else if (! this.closing) { throw new Error("subscriptions are active but got an invalid reply: " + reply); } } else if (this.monitoring) { @@ -457,6 +454,10 @@ RedisClient.prototype.send_command = function () { } } + if (args.length === 2 && Array.isArray(args[1])) { + args = [args[0]].concat(args[1]); + } + command_obj = { command: command, args: args, @@ -573,7 +574,8 @@ function Multi(client, args) { // string commands "get", "set", "setnx", "setex", "append", "substr", "strlen", "del", "exists", "incr", "decr", "mget", // list commands - "rpush", "lpush", "rpushx", "lpushx", "linsert", "rpop", "lpop", "brpop", "blpop", "llen", "lindex", "lset", "lrange", "ltrim", "lrem", "rpoplpush", + "rpush", "lpush", "rpushx", "lpushx", "linsert", "rpop", "lpop", "brpop", "blpop", "brpoplpush", "llen", "lindex", "lset", "lrange", + "ltrim", "lrem", "rpoplpush", // set commands "sadd", "srem", "smove", "sismember", "scard", "spop", "srandmember", "sinter", "sinterstore", "sunion", "sunionstore", "sdiff", "sdiffstore", "smembers", // sorted set commands @@ -668,7 +670,7 @@ Multi.prototype.exec = function (callback) { // drain queue, callback will catch "QUEUED" or error // Can't use a for loop here, as we need closure around the index. this.queue.forEach(function (args, index) { - var command = args[0]; + var command = args[0], obj; if (typeof args[args.length - 1] === "function") { args = args.slice(1, -1); } else { @@ -677,6 +679,13 @@ Multi.prototype.exec = function (callback) { if (args.length === 1 && Array.isArray(args[0])) { args = args[0]; } + if (command === 'hmset' && typeof args[1] === 'object') { + obj = args.pop(); + Object.keys(obj).forEach(function (key) { + args.push(key); + args.push(obj[key]); + }); + } this.client.send_command(command, args, function (err, reply) { if (err) { var cur = self.queue[index]; diff --git a/lib/queue.js b/lib/queue.js index 148513dc19..50679de085 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -1,22 +1,13 @@ -function to_array(args) { - var len = args.length, - arr = new Array(len), i; - - for (i = 0; i < len; i += 1) { - arr[i] = args[i]; - } - - return arr; -} +var to_array = require("./to_array"); // Queue class adapted from Tim Caswell's pattern library // http://github.com/creationix/pattern/blob/master/lib/pattern/queue.js -var Queue = function () { +function Queue() { this.tail = []; this.head = to_array(arguments); this.offset = 0; -}; +} Queue.prototype.shift = function () { if (this.offset === this.head.length) { diff --git a/lib/to_array.js b/lib/to_array.js new file mode 100644 index 0000000000..31e36bda2d --- /dev/null +++ b/lib/to_array.js @@ -0,0 +1,13 @@ +// the "new Array(len)" syntax is legal and optimized by V8, but JSHint is utterly confused by it. +function to_array(args) { + var len = args.length, + arr = new Array(len), i; + + for (i = 0; i < len; i += 1) { + arr[i] = args[i]; + } + + return arr; +}; + +module.exports = to_array; diff --git a/package.json b/package.json index 7544be8fde..0260e0692f 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { "name" : "redis", - "version" : "0.5.11", + "version" : "0.6.0", "description" : "Redis client library", "author": "Matt Ranney ", "contributors": [ @@ -11,7 +11,8 @@ "Aivo Paas", "Paul Carey", "Pieter Noordhuis", - "Andy Ray" + "Andy Ray", + "Vladimir Dronnikov" ], "main": "./index.js", "scripts": { diff --git a/test.js b/test.js index 83192e44f5..3201a190d1 100644 --- a/test.js +++ b/test.js @@ -318,7 +318,16 @@ tests.HMGET = function () { assert.strictEqual("abcdefghij", reply[0].toString(), name); assert.strictEqual("a type of value", reply[1].toString(), name); }); - + + client.HMGET(key1, ["0123456789"], function (err, reply) { + assert.strictEqual("abcdefghij", reply[0], name); + }); + + client.HMGET(key1, ["0123456789", "some manner of key"], function (err, reply) { + assert.strictEqual("abcdefghij", reply[0], name); + assert.strictEqual("a type of value", reply[1], name); + }); + client.HMGET(key1, "missing thing", "another missing thing", function (err, reply) { assert.strictEqual(null, reply[0], name); assert.strictEqual(null, reply[1], name); diff --git a/tests/reconnect_test.js b/tests/reconnect_test.js new file mode 100644 index 0000000000..08a6ca685d --- /dev/null +++ b/tests/reconnect_test.js @@ -0,0 +1,27 @@ +var redis = require("redis").createClient(); + +redis.on("error", function (err) { + console.log("Redis says: " + err); +}); + +redis.on("ready", function () { + console.log("Redis ready."); +}); + +redis.on("reconnecting", function (arg) { + console.log("Redis reconnecting: " + JSON.stringify(arg)); +}); +redis.on("connect", function () { + console.log("Redis connected."); +}); + +setInterval(function () { + var now = Date.now(); + redis.set("now", now, function (err, res) { + if (err) { + console.log(now + " Redis reply error: " + err); + } else { + console.log(now + " Redis reply: " + res); + } + }); +}, 200); diff --git a/tests/sub_quit_test.js b/tests/sub_quit_test.js new file mode 100644 index 0000000000..ad1f413228 --- /dev/null +++ b/tests/sub_quit_test.js @@ -0,0 +1,18 @@ +var client = require("redis").createClient(), + client2 = require("redis").createClient(); + +client.subscribe("something"); +client.on("subscribe", function (channel, count) { + console.log("Got sub: " + channel); + client.unsubscribe("something"); +}); + +client.on("unsubscribe", function (channel, count) { + console.log("Got unsub: " + channel + ", quitting"); + client.quit(); +}); + +// exercise unsub before sub +client2.unsubscribe("something"); +client2.subscribe("another thing"); +client2.quit();