1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Fix monitoring mode

This commit is contained in:
Ruben Bridgewater
2016-03-25 15:26:22 +01:00
parent bf568b6df7
commit 344291a98a
6 changed files with 150 additions and 81 deletions

View File

@@ -545,22 +545,20 @@ If you fire many commands at once this is going to **boost the execution speed b
Redis supports the `MONITOR` command, which lets you see all commands received by the Redis server Redis supports the `MONITOR` command, which lets you see all commands received by the Redis server
across all client connections, including from other client libraries and other computers. across all client connections, including from other client libraries and other computers.
After you send the `MONITOR` command, no other commands are valid on that connection. `node_redis` A `monitor` event is going to be emitted for every command fired from any client connected to the server including the monitoring client itself.
will emit a `monitor` event for every new monitor message that comes across. The callback for the The callback for the `monitor` event takes a timestamp from the Redis server, an array of command arguments and the raw monitoring string.
`monitor` event takes a timestamp from the Redis server and an array of command arguments.
Here is a simple example: Here is a simple example:
```js ```js
var client = require("redis").createClient(), var client = require("redis").createClient();
util = require("util");
client.monitor(function (err, res) { client.monitor(function (err, res) {
console.log("Entering monitoring mode."); console.log("Entering monitoring mode.");
}); });
client.set('foo', 'bar');
client.on("monitor", function (time, args) { client.on("monitor", function (time, args, raw_reply) {
console.log(time + ": " + util.inspect(args)); console.log(time + ": " + args); // 1458910076.446514:['set', 'foo', 'bar']
}); });
``` ```

View File

@@ -1,6 +1,20 @@
Changelog Changelog
========= =========
## v.2.6.0 - XX Mar, 2016
Features
- Monitor now works together with the offline queue
- All commands that were send after a connection loss are now going to be send after reconnecting
- Activating monitor mode does now work together with arbitrary commands including pub sub mode
Bugfixes
- Fixed calling monitor command while other commands are still running
- Fixed monitor and pub sub mode not working together
- Fixed monitor mode not working in combination with the offline queue
## v.2.5.3 - 21 Mar, 2016 ## v.2.5.3 - 21 Mar, 2016
Bugfixes Bugfixes

View File

@@ -140,6 +140,7 @@ function RedisClient (options, stream) {
this.pipeline = 0; this.pipeline = 0;
this.times_connected = 0; this.times_connected = 0;
this.options = options; this.options = options;
this.buffers = options.return_buffers || options.detect_buffers;
// Init parser // Init parser
this.reply_parser = Parser({ this.reply_parser = Parser({
returnReply: function (data) { returnReply: function (data) {
@@ -154,7 +155,7 @@ function RedisClient (options, stream) {
self.stream.destroy(); self.stream.destroy();
self.return_error(err); self.return_error(err);
}, },
returnBuffers: options.return_buffers || options.detect_buffers, returnBuffers: this.buffers,
name: options.parser name: options.parser
}); });
this.create_stream(); this.create_stream();
@@ -329,9 +330,7 @@ RedisClient.prototype.on_error = function (err) {
} }
err.message = 'Redis connection to ' + this.address + ' failed - ' + err.message; err.message = 'Redis connection to ' + this.address + ' failed - ' + err.message;
debug(err.message); debug(err.message);
this.connected = false; this.connected = false;
this.ready = false; this.ready = false;
@@ -369,12 +368,6 @@ RedisClient.prototype.on_ready = function () {
debug('on_ready called ' + this.address + ' id ' + this.connection_id); debug('on_ready called ' + this.address + ' id ' + this.connection_id);
this.ready = true; this.ready = true;
if (this.old_state !== null) {
this.monitoring = this.old_state.monitoring;
this.pub_sub_mode = this.old_state.pub_sub_mode;
this.old_state = null;
}
var cork; var cork;
if (!this.stream.cork) { if (!this.stream.cork) {
cork = function (len) { cork = function (len) {
@@ -393,16 +386,15 @@ RedisClient.prototype.on_ready = function () {
} }
this.cork = cork; this.cork = cork;
// restore modal commands from previous connection // restore modal commands from previous connection. The order of the commands is important
if (this.selected_db !== undefined) { if (this.selected_db !== undefined) {
// this trick works if and only if the following send_command
// never goes into the offline queue
var pub_sub_mode = this.pub_sub_mode;
this.pub_sub_mode = false;
this.send_command('select', [this.selected_db]); this.send_command('select', [this.selected_db]);
this.pub_sub_mode = pub_sub_mode;
} }
if (this.pub_sub_mode === true) { if (this.old_state !== null) {
this.monitoring = this.old_state.monitoring;
this.pub_sub_mode = this.old_state.pub_sub_mode;
}
if (this.pub_sub_mode) {
// only emit 'ready' when all subscriptions were made again // only emit 'ready' when all subscriptions were made again
var callback_count = 0; var callback_count = 0;
var callback = function () { var callback = function () {
@@ -424,12 +416,10 @@ RedisClient.prototype.on_ready = function () {
}); });
return; return;
} }
if (this.monitoring) { if (this.monitoring) {
this.send_command('monitor', []); this.send_command('monitor', []);
} else {
this.send_offline_queue();
} }
this.send_offline_queue();
this.emit('ready'); this.emit('ready');
}; };
@@ -525,7 +515,6 @@ RedisClient.prototype.connection_gone = function (why, error) {
this.cork = noop; this.cork = noop;
this.pipeline = 0; this.pipeline = 0;
if (this.old_state === null) {
var state = { var state = {
monitoring: this.monitoring, monitoring: this.monitoring,
pub_sub_mode: this.pub_sub_mode pub_sub_mode: this.pub_sub_mode
@@ -533,7 +522,6 @@ RedisClient.prototype.connection_gone = function (why, error) {
this.old_state = state; this.old_state = state;
this.monitoring = false; this.monitoring = false;
this.pub_sub_mode = false; this.pub_sub_mode = false;
}
// since we are collapsing end and close, users don't expect to be called twice // since we are collapsing end and close, users don't expect to be called twice
if (!this.emitted_end) { if (!this.emitted_end) {
@@ -604,9 +592,7 @@ RedisClient.prototype.connection_gone = function (why, error) {
}; };
RedisClient.prototype.return_error = function (err) { RedisClient.prototype.return_error = function (err) {
var command_obj = this.command_queue.shift(), var command_obj = this.command_queue.shift();
queue_len = this.command_queue.length;
if (command_obj && command_obj.command && command_obj.command.toUpperCase) { if (command_obj && command_obj.command && command_obj.command.toUpperCase) {
err.command = command_obj.command.toUpperCase(); err.command = command_obj.command.toUpperCase();
} }
@@ -617,8 +603,7 @@ RedisClient.prototype.return_error = function (err) {
err.code = match[1]; err.code = match[1];
} }
this.emit_idle(queue_len); this.emit_idle();
utils.callback_or_emit(this, command_obj && command_obj.callback, err); utils.callback_or_emit(this, command_obj && command_obj.callback, err);
}; };
@@ -627,8 +612,8 @@ RedisClient.prototype.drain = function () {
this.should_buffer = false; this.should_buffer = false;
}; };
RedisClient.prototype.emit_idle = function (queue_len) { RedisClient.prototype.emit_idle = function () {
if (queue_len === 0 && this.pub_sub_mode === false) { if (this.command_queue.length === 0 && this.pub_sub_mode === false) {
this.emit('idle'); this.emit('idle');
} }
}; };
@@ -640,20 +625,6 @@ function queue_state_error (self, command_obj) {
self.emit('error', err); self.emit('error', err);
} }
function monitor (self, reply) {
if (typeof reply !== 'string') {
reply = reply.toString();
}
// If in monitoring mode only two commands are valid ones: AUTH and MONITOR wich reply with OK
var len = reply.indexOf(' ');
var timestamp = reply.slice(0, len);
var argindex = reply.indexOf('"');
var args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) {
return elem.replace(/\\"/g, '"');
});
self.emit('monitor', timestamp, args);
}
function normal_reply (self, reply, command_obj) { function normal_reply (self, reply, command_obj) {
if (typeof command_obj.callback === 'function') { if (typeof command_obj.callback === 'function') {
if ('exec' !== command_obj.command) { if ('exec' !== command_obj.command) {
@@ -716,7 +687,7 @@ RedisClient.prototype.return_reply = function (reply) {
queue_len = this.command_queue.length; queue_len = this.command_queue.length;
this.emit_idle(queue_len); this.emit_idle();
if (command_obj && !command_obj.sub_command) { if (command_obj && !command_obj.sub_command) {
normal_reply(this, reply, command_obj); normal_reply(this, reply, command_obj);
@@ -724,9 +695,7 @@ RedisClient.prototype.return_reply = function (reply) {
return_pub_sub(this, reply, command_obj); return_pub_sub(this, reply, command_obj);
} }
/* istanbul ignore else: this is a safety check that we should not be able to trigger */ /* istanbul ignore else: this is a safety check that we should not be able to trigger */
else if (this.monitoring) { else if (!this.monitoring) {
monitor(this, reply);
} else {
queue_state_error(this, command_obj); queue_state_error(this, command_obj);
} }
}; };
@@ -837,8 +806,6 @@ RedisClient.prototype.send_command = function (command, args, callback) {
if (command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe') { if (command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe') {
this.pub_sub_command(command_obj); // TODO: This has to be moved to the result handler this.pub_sub_command(command_obj); // TODO: This has to be moved to the result handler
} else if (command === 'monitor') {
this.monitoring = true;
} else if (command === 'quit') { } else if (command === 'quit') {
this.closing = true; this.closing = true;
} }

View File

@@ -33,6 +33,41 @@ RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (d
}); });
}; };
RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function (callback) {
// Use a individual command, as this is a special case that does not has to be checked for any other command
var self = this;
return this.send_command('monitor', [], function (err, res) {
if (err === null) {
self.reply_parser.returnReply = function (reply) {
// If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands
// As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve
// the average performance of all other commands in case of no monitor mode
if (self.monitoring) {
var replyStr;
if (self.buffers && Buffer.isBuffer(reply)) {
replyStr = reply.toString();
} else {
replyStr = reply;
}
// While reconnecting the redis server does not recognize the client as in monitor mode anymore
// Therefor the monitor command has to finish before it catches further commands
if (typeof replyStr === 'string' && utils.monitor_regex.test(replyStr)) {
var timestamp = replyStr.slice(0, replyStr.indexOf(' '));
var args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map(function (elem) {
return elem.replace(/\\"/g, '"');
});
self.emit('monitor', timestamp, args, replyStr);
return;
}
}
self.return_reply(reply);
};
self.monitoring = true;
}
utils.callback_or_emit(self, callback, err, res);
});
};
// Store info in this.server_info after each call // Store info in this.server_info after each call
RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section, callback) { RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section, callback) {
var self = this; var self = this;

View File

@@ -39,8 +39,6 @@ function print (err, reply) {
} }
} }
var redisErrCode = /^([A-Z]+)\s+(.+)$/;
// Deep clone arbitrary objects with arrays. Can't handle cyclic structures (results in a range error) // Deep clone arbitrary objects with arrays. Can't handle cyclic structures (results in a range error)
// Any attribute with a non primitive value besides object and array will be passed by reference (e.g. Buffers, Maps, Functions) // Any attribute with a non primitive value besides object and array will be passed by reference (e.g. Buffers, Maps, Functions)
function clone (obj) { function clone (obj) {
@@ -102,7 +100,8 @@ module.exports = {
reply_to_strings: replyToStrings, reply_to_strings: replyToStrings,
reply_to_object: replyToObject, reply_to_object: replyToObject,
print: print, print: print,
err_code: redisErrCode, err_code: /^([A-Z]+)\s+(.+)$/,
monitor_regex: /^[0-9]{10,11}\.[0-9]+ \[[0-9]{1,3} [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}:[0-9]{1,5}\].*/,
clone: convenienceClone, clone: convenienceClone,
callback_or_emit: callbackOrEmit, callback_or_emit: callbackOrEmit,
reply_in_order: replyInOrder reply_in_order: replyInOrder

View File

@@ -3,6 +3,7 @@
var assert = require("assert"); var assert = require("assert");
var config = require("./lib/config"); var config = require("./lib/config");
var helper = require('./helper'); var helper = require('./helper');
var utils = require('../lib/utils');
var fork = require("child_process").fork; var fork = require("child_process").fork;
var redis = config.redis; var redis = config.redis;
@@ -211,7 +212,7 @@ describe("The node_redis client", function () {
it("return an error in the callback", function (done) { it("return an error in the callback", function (done) {
if (helper.redisProcess().spawnFailed()) this.skip(); if (helper.redisProcess().spawnFailed()) this.skip();
// TODO: Investigate why this test is failing hard and killing mocha. // TODO: Investigate why this test is failing hard and killing mocha if using '/tmp/redis.sock'.
// Seems like something is wrong with nyc while passing a socket connection to create client! // Seems like something is wrong with nyc while passing a socket connection to create client!
client = redis.createClient(); client = redis.createClient();
client.quit(function() { client.quit(function() {
@@ -366,36 +367,91 @@ describe("The node_redis client", function () {
}); });
describe('monitor', function () { describe('monitor', function () {
it('monitors commands on all other redis clients', function (done) { it('monitors commands on all redis clients and works in the correct order', function (done) {
var monitorClient = redis.createClient.apply(null, args); var monitorClient = redis.createClient.apply(null, args);
var responses = []; var responses = [];
var end = helper.callFuncAfter(done, 5);
monitorClient.set('foo', 'bar');
monitorClient.flushdb();
monitorClient.monitor(function (err, res) { monitorClient.monitor(function (err, res) {
assert.strictEqual(res, 'OK');
client.mget("some", "keys", "foo", "bar"); client.mget("some", "keys", "foo", "bar");
client.set("json", JSON.stringify({ client.set("json", JSON.stringify({
foo: "123", foo: "123",
bar: "sdflkdfsjk", bar: "sdflkdfsjk",
another: false another: false
})); }));
monitorClient.get('baz', function (err, res) {
assert.strictEqual(res, null);
end(err);
});
monitorClient.set('foo', 'bar" "s are " " good!"', function (err, res) {
assert.strictEqual(res, 'OK');
end(err);
});
monitorClient.mget('foo', 'baz', function (err, res) {
assert.strictEqual(res[0], 'bar" "s are " " good!"');
assert.strictEqual(res[1], null);
end(err);
});
monitorClient.subscribe('foo', 'baz', function (err, res) {
// The return value might change in v.3
// assert.strictEqual(res, 'baz');
// TODO: Fix the return value of subscribe calls
end(err);
});
}); });
monitorClient.on("monitor", function (time, args) { monitorClient.on("monitor", function (time, args, rawOutput) {
responses.push(args); responses.push(args);
if (responses.length === 2) { assert(utils.monitor_regex.test(rawOutput), rawOutput);
assert.strictEqual(5, responses[0].length); if (responses.length === 6) {
assert.strictEqual("mget", responses[0][0]); assert.deepEqual(responses[0], ['mget', 'some', 'keys', 'foo', 'bar']);
assert.strictEqual("some", responses[0][1]); assert.deepEqual(responses[1], ['set', 'json', '{"foo":"123","bar":"sdflkdfsjk","another":false}']);
assert.strictEqual("keys", responses[0][2]); assert.deepEqual(responses[2], ['get', 'baz']);
assert.strictEqual("foo", responses[0][3]); assert.deepEqual(responses[3], ['set', 'foo', 'bar" "s are " " good!"']);
assert.strictEqual("bar", responses[0][4]); assert.deepEqual(responses[4], ['mget', 'foo', 'baz']);
assert.strictEqual(3, responses[1].length); assert.deepEqual(responses[5], ['subscribe', 'foo', 'baz']);
assert.strictEqual("set", responses[1][0]); monitorClient.quit(end);
assert.strictEqual("json", responses[1][1]);
assert.strictEqual('{"foo":"123","bar":"sdflkdfsjk","another":false}', responses[1][2]);
monitorClient.quit(done);
} }
}); });
}); });
it('monitors returns strings in the rawOutput even with return_buffers activated', function (done) {
var monitorClient = redis.createClient({
return_buffers: true
});
monitorClient.MONITOR(function (err, res) {
assert.strictEqual(res.inspect(), new Buffer('OK').inspect());
client.mget("hello", new Buffer('world'));
});
monitorClient.on("monitor", function (time, args, rawOutput) {
assert.strictEqual(typeof rawOutput, 'string');
assert(utils.monitor_regex.test(rawOutput), rawOutput);
assert.deepEqual(args, ['mget', 'hello', 'world']);
// Quit immediatly ends monitoring mode and therefor does not stream back the quit command
monitorClient.quit(done);
});
});
it('monitors reconnects properly and works with the offline queue', function (done) {
var i = 0;
client.MONITOR(helper.isString('OK'));
client.mget("hello", 'world');
client.on("monitor", function (time, args, rawOutput) {
assert(utils.monitor_regex.test(rawOutput), rawOutput);
assert.deepEqual(args, ['mget', 'hello', 'world']);
if (i++ === 2) {
// End after two reconnects
return done();
}
client.stream.destroy();
client.mget("hello", 'world');
});
});
}); });
describe('idle', function () { describe('idle', function () {