From 344291a98a80bdd17ece6b5247b052aa7de0e153 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Fri, 25 Mar 2016 15:26:22 +0100 Subject: [PATCH] Fix monitoring mode --- README.md | 14 +++---- changelog.md | 14 +++++++ index.js | 77 ++++++++++------------------------- lib/individualCommands.js | 35 ++++++++++++++++ lib/utils.js | 5 +-- test/node_redis.spec.js | 86 ++++++++++++++++++++++++++++++++------- 6 files changed, 150 insertions(+), 81 deletions(-) diff --git a/README.md b/README.md index 355d43b706..2c03e795cd 100644 --- a/README.md +++ b/README.md @@ -545,22 +545,20 @@ If you fire many commands at once this is going to **boost the execution speed b Redis supports the `MONITOR` command, which lets you see all commands received by the Redis server across all client connections, including from other client libraries and other computers. -After you send the `MONITOR` command, no other commands are valid on that connection. `node_redis` -will emit a `monitor` event for every new monitor message that comes across. The callback for the -`monitor` event takes a timestamp from the Redis server and an array of command arguments. +A `monitor` event is going to be emitted for every command fired from any client connected to the server including the monitoring client itself. +The callback for the `monitor` event takes a timestamp from the Redis server, an array of command arguments and the raw monitoring string. Here is a simple example: ```js -var client = require("redis").createClient(), - util = require("util"); - +var client = require("redis").createClient(); client.monitor(function (err, res) { console.log("Entering monitoring mode."); }); +client.set('foo', 'bar'); -client.on("monitor", function (time, args) { - console.log(time + ": " + util.inspect(args)); +client.on("monitor", function (time, args, raw_reply) { + console.log(time + ": " + args); // 1458910076.446514:['set', 'foo', 'bar'] }); ``` diff --git a/changelog.md b/changelog.md index 1dba602acb..992eb42216 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,20 @@ Changelog ========= +## v.2.6.0 - XX Mar, 2016 + +Features + +- Monitor now works together with the offline queue + - All commands that were send after a connection loss are now going to be send after reconnecting +- Activating monitor mode does now work together with arbitrary commands including pub sub mode + +Bugfixes + +- Fixed calling monitor command while other commands are still running +- Fixed monitor and pub sub mode not working together +- Fixed monitor mode not working in combination with the offline queue + ## v.2.5.3 - 21 Mar, 2016 Bugfixes diff --git a/index.js b/index.js index b88c1ed4fb..90a56d68e7 100644 --- a/index.js +++ b/index.js @@ -140,6 +140,7 @@ function RedisClient (options, stream) { this.pipeline = 0; this.times_connected = 0; this.options = options; + this.buffers = options.return_buffers || options.detect_buffers; // Init parser this.reply_parser = Parser({ returnReply: function (data) { @@ -154,7 +155,7 @@ function RedisClient (options, stream) { self.stream.destroy(); self.return_error(err); }, - returnBuffers: options.return_buffers || options.detect_buffers, + returnBuffers: this.buffers, name: options.parser }); this.create_stream(); @@ -329,9 +330,7 @@ RedisClient.prototype.on_error = function (err) { } err.message = 'Redis connection to ' + this.address + ' failed - ' + err.message; - debug(err.message); - this.connected = false; this.ready = false; @@ -369,12 +368,6 @@ RedisClient.prototype.on_ready = function () { debug('on_ready called ' + this.address + ' id ' + this.connection_id); this.ready = true; - if (this.old_state !== null) { - this.monitoring = this.old_state.monitoring; - this.pub_sub_mode = this.old_state.pub_sub_mode; - this.old_state = null; - } - var cork; if (!this.stream.cork) { cork = function (len) { @@ -393,16 +386,15 @@ RedisClient.prototype.on_ready = function () { } this.cork = cork; - // restore modal commands from previous connection + // restore modal commands from previous connection. The order of the commands is important if (this.selected_db !== undefined) { - // this trick works if and only if the following send_command - // never goes into the offline queue - var pub_sub_mode = this.pub_sub_mode; - this.pub_sub_mode = false; this.send_command('select', [this.selected_db]); - this.pub_sub_mode = pub_sub_mode; } - if (this.pub_sub_mode === true) { + if (this.old_state !== null) { + this.monitoring = this.old_state.monitoring; + this.pub_sub_mode = this.old_state.pub_sub_mode; + } + if (this.pub_sub_mode) { // only emit 'ready' when all subscriptions were made again var callback_count = 0; var callback = function () { @@ -424,12 +416,10 @@ RedisClient.prototype.on_ready = function () { }); return; } - if (this.monitoring) { this.send_command('monitor', []); - } else { - this.send_offline_queue(); } + this.send_offline_queue(); this.emit('ready'); }; @@ -525,15 +515,13 @@ RedisClient.prototype.connection_gone = function (why, error) { this.cork = noop; this.pipeline = 0; - if (this.old_state === null) { - var state = { - monitoring: this.monitoring, - pub_sub_mode: this.pub_sub_mode - }; - this.old_state = state; - this.monitoring = false; - this.pub_sub_mode = false; - } + var state = { + monitoring: this.monitoring, + pub_sub_mode: this.pub_sub_mode + }; + this.old_state = state; + this.monitoring = false; + this.pub_sub_mode = false; // since we are collapsing end and close, users don't expect to be called twice if (!this.emitted_end) { @@ -604,9 +592,7 @@ RedisClient.prototype.connection_gone = function (why, error) { }; RedisClient.prototype.return_error = function (err) { - var command_obj = this.command_queue.shift(), - queue_len = this.command_queue.length; - + var command_obj = this.command_queue.shift(); if (command_obj && command_obj.command && command_obj.command.toUpperCase) { err.command = command_obj.command.toUpperCase(); } @@ -617,8 +603,7 @@ RedisClient.prototype.return_error = function (err) { err.code = match[1]; } - this.emit_idle(queue_len); - + this.emit_idle(); utils.callback_or_emit(this, command_obj && command_obj.callback, err); }; @@ -627,8 +612,8 @@ RedisClient.prototype.drain = function () { this.should_buffer = false; }; -RedisClient.prototype.emit_idle = function (queue_len) { - if (queue_len === 0 && this.pub_sub_mode === false) { +RedisClient.prototype.emit_idle = function () { + if (this.command_queue.length === 0 && this.pub_sub_mode === false) { this.emit('idle'); } }; @@ -640,20 +625,6 @@ function queue_state_error (self, command_obj) { self.emit('error', err); } -function monitor (self, reply) { - if (typeof reply !== 'string') { - reply = reply.toString(); - } - // If in monitoring mode only two commands are valid ones: AUTH and MONITOR wich reply with OK - var len = reply.indexOf(' '); - var timestamp = reply.slice(0, len); - var argindex = reply.indexOf('"'); - var args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) { - return elem.replace(/\\"/g, '"'); - }); - self.emit('monitor', timestamp, args); -} - function normal_reply (self, reply, command_obj) { if (typeof command_obj.callback === 'function') { if ('exec' !== command_obj.command) { @@ -716,7 +687,7 @@ RedisClient.prototype.return_reply = function (reply) { queue_len = this.command_queue.length; - this.emit_idle(queue_len); + this.emit_idle(); if (command_obj && !command_obj.sub_command) { normal_reply(this, reply, command_obj); @@ -724,9 +695,7 @@ RedisClient.prototype.return_reply = function (reply) { return_pub_sub(this, reply, command_obj); } /* istanbul ignore else: this is a safety check that we should not be able to trigger */ - else if (this.monitoring) { - monitor(this, reply); - } else { + else if (!this.monitoring) { queue_state_error(this, command_obj); } }; @@ -837,8 +806,6 @@ RedisClient.prototype.send_command = function (command, args, callback) { if (command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe') { this.pub_sub_command(command_obj); // TODO: This has to be moved to the result handler - } else if (command === 'monitor') { - this.monitoring = true; } else if (command === 'quit') { this.closing = true; } diff --git a/lib/individualCommands.js b/lib/individualCommands.js index 572228fe78..f7b7341ec5 100644 --- a/lib/individualCommands.js +++ b/lib/individualCommands.js @@ -33,6 +33,41 @@ RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (d }); }; +RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function (callback) { + // Use a individual command, as this is a special case that does not has to be checked for any other command + var self = this; + return this.send_command('monitor', [], function (err, res) { + if (err === null) { + self.reply_parser.returnReply = function (reply) { + // If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands + // As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve + // the average performance of all other commands in case of no monitor mode + if (self.monitoring) { + var replyStr; + if (self.buffers && Buffer.isBuffer(reply)) { + replyStr = reply.toString(); + } else { + replyStr = reply; + } + // While reconnecting the redis server does not recognize the client as in monitor mode anymore + // Therefor the monitor command has to finish before it catches further commands + if (typeof replyStr === 'string' && utils.monitor_regex.test(replyStr)) { + var timestamp = replyStr.slice(0, replyStr.indexOf(' ')); + var args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map(function (elem) { + return elem.replace(/\\"/g, '"'); + }); + self.emit('monitor', timestamp, args, replyStr); + return; + } + } + self.return_reply(reply); + }; + self.monitoring = true; + } + utils.callback_or_emit(self, callback, err, res); + }); +}; + // Store info in this.server_info after each call RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section, callback) { var self = this; diff --git a/lib/utils.js b/lib/utils.js index 069bf30645..b1b46c7829 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -39,8 +39,6 @@ function print (err, reply) { } } -var redisErrCode = /^([A-Z]+)\s+(.+)$/; - // Deep clone arbitrary objects with arrays. Can't handle cyclic structures (results in a range error) // Any attribute with a non primitive value besides object and array will be passed by reference (e.g. Buffers, Maps, Functions) function clone (obj) { @@ -102,7 +100,8 @@ module.exports = { reply_to_strings: replyToStrings, reply_to_object: replyToObject, print: print, - err_code: redisErrCode, + err_code: /^([A-Z]+)\s+(.+)$/, + monitor_regex: /^[0-9]{10,11}\.[0-9]+ \[[0-9]{1,3} [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}:[0-9]{1,5}\].*/, clone: convenienceClone, callback_or_emit: callbackOrEmit, reply_in_order: replyInOrder diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index 44dc40f78f..f45b95c3bd 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -3,6 +3,7 @@ var assert = require("assert"); var config = require("./lib/config"); var helper = require('./helper'); +var utils = require('../lib/utils'); var fork = require("child_process").fork; var redis = config.redis; @@ -211,7 +212,7 @@ describe("The node_redis client", function () { it("return an error in the callback", function (done) { if (helper.redisProcess().spawnFailed()) this.skip(); - // TODO: Investigate why this test is failing hard and killing mocha. + // TODO: Investigate why this test is failing hard and killing mocha if using '/tmp/redis.sock'. // Seems like something is wrong with nyc while passing a socket connection to create client! client = redis.createClient(); client.quit(function() { @@ -366,36 +367,91 @@ describe("The node_redis client", function () { }); describe('monitor', function () { - it('monitors commands on all other redis clients', function (done) { + it('monitors commands on all redis clients and works in the correct order', function (done) { var monitorClient = redis.createClient.apply(null, args); var responses = []; + var end = helper.callFuncAfter(done, 5); + monitorClient.set('foo', 'bar'); + monitorClient.flushdb(); monitorClient.monitor(function (err, res) { + assert.strictEqual(res, 'OK'); client.mget("some", "keys", "foo", "bar"); client.set("json", JSON.stringify({ foo: "123", bar: "sdflkdfsjk", another: false })); + monitorClient.get('baz', function (err, res) { + assert.strictEqual(res, null); + end(err); + }); + monitorClient.set('foo', 'bar" "s are " " good!"', function (err, res) { + assert.strictEqual(res, 'OK'); + end(err); + }); + monitorClient.mget('foo', 'baz', function (err, res) { + assert.strictEqual(res[0], 'bar" "s are " " good!"'); + assert.strictEqual(res[1], null); + end(err); + }); + monitorClient.subscribe('foo', 'baz', function (err, res) { + // The return value might change in v.3 + // assert.strictEqual(res, 'baz'); + // TODO: Fix the return value of subscribe calls + end(err); + }); }); - monitorClient.on("monitor", function (time, args) { + monitorClient.on("monitor", function (time, args, rawOutput) { responses.push(args); - if (responses.length === 2) { - assert.strictEqual(5, responses[0].length); - assert.strictEqual("mget", responses[0][0]); - assert.strictEqual("some", responses[0][1]); - assert.strictEqual("keys", responses[0][2]); - assert.strictEqual("foo", responses[0][3]); - assert.strictEqual("bar", responses[0][4]); - assert.strictEqual(3, responses[1].length); - assert.strictEqual("set", responses[1][0]); - assert.strictEqual("json", responses[1][1]); - assert.strictEqual('{"foo":"123","bar":"sdflkdfsjk","another":false}', responses[1][2]); - monitorClient.quit(done); + assert(utils.monitor_regex.test(rawOutput), rawOutput); + if (responses.length === 6) { + assert.deepEqual(responses[0], ['mget', 'some', 'keys', 'foo', 'bar']); + assert.deepEqual(responses[1], ['set', 'json', '{"foo":"123","bar":"sdflkdfsjk","another":false}']); + assert.deepEqual(responses[2], ['get', 'baz']); + assert.deepEqual(responses[3], ['set', 'foo', 'bar" "s are " " good!"']); + assert.deepEqual(responses[4], ['mget', 'foo', 'baz']); + assert.deepEqual(responses[5], ['subscribe', 'foo', 'baz']); + monitorClient.quit(end); } }); }); + + it('monitors returns strings in the rawOutput even with return_buffers activated', function (done) { + var monitorClient = redis.createClient({ + return_buffers: true + }); + + monitorClient.MONITOR(function (err, res) { + assert.strictEqual(res.inspect(), new Buffer('OK').inspect()); + client.mget("hello", new Buffer('world')); + }); + + monitorClient.on("monitor", function (time, args, rawOutput) { + assert.strictEqual(typeof rawOutput, 'string'); + assert(utils.monitor_regex.test(rawOutput), rawOutput); + assert.deepEqual(args, ['mget', 'hello', 'world']); + // Quit immediatly ends monitoring mode and therefor does not stream back the quit command + monitorClient.quit(done); + }); + }); + + it('monitors reconnects properly and works with the offline queue', function (done) { + var i = 0; + client.MONITOR(helper.isString('OK')); + client.mget("hello", 'world'); + client.on("monitor", function (time, args, rawOutput) { + assert(utils.monitor_regex.test(rawOutput), rawOutput); + assert.deepEqual(args, ['mget', 'hello', 'world']); + if (i++ === 2) { + // End after two reconnects + return done(); + } + client.stream.destroy(); + client.mget("hello", 'world'); + }); + }); }); describe('idle', function () {