From cd58e1fd8961ffabafe2bfea960229df5a0bcc0c Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Wed, 13 Apr 2016 04:18:39 +0200 Subject: [PATCH] Implement message_buffer and pmessage_buffer events --- README.md | 10 ++++++++ index.js | 60 +++++++++++++++++++++++++-------------------- test/pubsub.spec.js | 24 ++++++++++++++++++ 3 files changed, 67 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 5aa7804d1c..3a4abf9eb9 100644 --- a/README.md +++ b/README.md @@ -410,6 +410,16 @@ Client will emit `pmessage` for every message received that matches an active su Listeners are passed the original pattern used with `PSUBSCRIBE` as `pattern`, the sending channel name as `channel`, and the message as `message`. +### "message_buffer" (channel, message) + +This is the same as the `message` event with the exception, that it is always going to emit a buffer. +If you listen to the `message` event at the same time as the `message_buffer`, it is always going to emit a string. + +### "pmessage_buffer" (pattern, channel, message) + +This is the same as the `pmessage` event with the exception, that it is always going to emit a buffer. +If you listen to the `pmessage` event at the same time as the `pmessage_buffer`, it is always going to emit a string. + ### "subscribe" (channel, count) Client will emit `subscribe` in response to a `SUBSCRIBE` command. Listeners are passed the diff --git a/index.js b/index.js index b5c62f3c65..5848279e00 100644 --- a/index.js +++ b/index.js @@ -27,7 +27,7 @@ if (typeof EventEmitter !== 'function') { function noop () {} function handle_detect_buffers_reply (reply, command, buffer_args) { - if (buffer_args === false) { + if (buffer_args === false || this.message_buffers) { // If detect_buffers option was specified, then the reply from the parser will be a buffer. // If this command did not use Buffer arguments, then convert the reply to Strings here. reply = utils.reply_to_strings(reply); @@ -138,6 +138,7 @@ function RedisClient (options, stream) { this.pub_sub_mode = 0; this.subscription_set = {}; this.monitoring = false; + this.message_buffers = false; this.closing = false; this.server_info = {}; this.auth_pass = options.auth_pass || options.password; @@ -149,23 +150,7 @@ function RedisClient (options, stream) { this.options = options; this.buffers = options.return_buffers || options.detect_buffers; // Init parser - this.reply_parser = Parser({ - returnReply: function (data) { - self.return_reply(data); - }, - returnError: function (err) { - self.return_error(err); - }, - returnFatalError: function (err) { - // Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again - self.flush_and_error(err, ['command_queue']); - self.stream.destroy(); - self.return_error(err); - }, - returnBuffers: this.buffers, - name: options.parser, - stringNumbers: options.string_numbers - }); + this.reply_parser = create_parser(this, options); this.create_stream(); // The listeners will not be attached right away, so let's print the deprecation message while the listener is attached this.on('newListener', function (event) { @@ -179,6 +164,10 @@ function RedisClient (options, stream) { 'The drain event listener is deprecated and will be removed in v.3.0.0.\n' + 'If you want to keep on listening to this event please listen to the stream drain event directly.' ); + } else if (event === 'message_buffer' || event === 'pmessage_buffer' || event === 'messageBuffer' || event === 'pmessageBuffer' && !this.buffers) { + this.message_buffers = true; + this.handle_reply = handle_detect_buffers_reply; + this.reply_parser = create_parser(this); } }); } @@ -186,6 +175,26 @@ util.inherits(RedisClient, EventEmitter); RedisClient.connection_id = 0; +function create_parser (self) { + return Parser({ + returnReply: function (data) { + self.return_reply(data); + }, + returnError: function (err) { + self.return_error(err); + }, + returnFatalError: function (err) { + // Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again + self.flush_and_error(err, ['command_queue']); + self.stream.destroy(); + self.return_error(err); + }, + returnBuffers: self.buffers || self.message_buffers, + name: self.options.parser, + stringNumbers: self.options.string_numbers + }); +} + /****************************************************************************** All functions in here are internal besides the RedisClient constructor @@ -696,21 +705,18 @@ function subscribe_unsubscribe (self, reply, type, subscribe) { function return_pub_sub (self, reply) { var type = reply[0].toString(); if (type === 'message') { // channel, message - // TODO: Implement message_buffer - // if (self.buffers) { - // self.emit('message_buffer', reply[1], reply[2]); - // } - if (!self.options.return_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter + if (!self.options.return_buffers || self.message_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter self.emit('message', reply[1].toString(), reply[2].toString()); + self.emit('message_buffer', reply[1], reply[2]); + self.emit('messageBuffer', reply[1], reply[2]); } else { self.emit('message', reply[1], reply[2]); } } else if (type === 'pmessage') { // pattern, channel, message - // if (self.buffers) { - // self.emit('pmessage_buffer', reply[1], reply[2], reply[3]); - // } - if (!self.options.return_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter + if (!self.options.return_buffers || self.message_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter self.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3].toString()); + self.emit('pmessage_buffer', reply[1], reply[2], reply[3]); + self.emit('pmessageBuffer', reply[1], reply[2], reply[3]); } else { self.emit('pmessage', reply[1], reply[2], reply[3]); } diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 9cb388b03e..f9401f63ad 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -504,6 +504,30 @@ describe('publish/subscribe', function () { pub.publish('/foo', 'hello world', helper.isNumber(3)); }); }); + + it('allows to listen to pmessageBuffer and pmessage', function (done) { + var batch = sub.batch(); + batch.psubscribe('*'); + batch.subscribe('/foo'); + batch.unsubscribe('/foo'); + batch.unsubscribe(); + batch.subscribe(['/foo']); + batch.exec(); + assert.strictEqual(sub.shouldBuffer, false); + sub.on('pmessageBuffer', function (pattern, channel, message) { + assert.strictEqual(pattern.inspect(), new Buffer('*').inspect()); + assert.strictEqual(channel.inspect(), new Buffer('/foo').inspect()); + sub.quit(done); + }); + sub.on('pmessage', function (pattern, channel, message) { + assert.strictEqual(pattern, '*'); + assert.strictEqual(channel, '/foo'); + }); + pub.pubsub('numsub', '/foo', function (err, res) { + assert.deepEqual(res, ['/foo', 1]); + }); + pub.publish('/foo', 'hello world', helper.isNumber(2)); + }); }); describe('punsubscribe', function () {