1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00

Implement message_buffer and pmessage_buffer events

This commit is contained in:
Ruben Bridgewater
2016-04-13 04:18:39 +02:00
parent a9d565b8f4
commit cd58e1fd89
3 changed files with 67 additions and 27 deletions

View File

@@ -410,6 +410,16 @@ Client will emit `pmessage` for every message received that matches an active su
Listeners are passed the original pattern used with `PSUBSCRIBE` as `pattern`, the sending channel Listeners are passed the original pattern used with `PSUBSCRIBE` as `pattern`, the sending channel
name as `channel`, and the message as `message`. name as `channel`, and the message as `message`.
### "message_buffer" (channel, message)
This is the same as the `message` event with the exception, that it is always going to emit a buffer.
If you listen to the `message` event at the same time as the `message_buffer`, it is always going to emit a string.
### "pmessage_buffer" (pattern, channel, message)
This is the same as the `pmessage` event with the exception, that it is always going to emit a buffer.
If you listen to the `pmessage` event at the same time as the `pmessage_buffer`, it is always going to emit a string.
### "subscribe" (channel, count) ### "subscribe" (channel, count)
Client will emit `subscribe` in response to a `SUBSCRIBE` command. Listeners are passed the Client will emit `subscribe` in response to a `SUBSCRIBE` command. Listeners are passed the

View File

@@ -27,7 +27,7 @@ if (typeof EventEmitter !== 'function') {
function noop () {} function noop () {}
function handle_detect_buffers_reply (reply, command, buffer_args) { function handle_detect_buffers_reply (reply, command, buffer_args) {
if (buffer_args === false) { if (buffer_args === false || this.message_buffers) {
// If detect_buffers option was specified, then the reply from the parser will be a buffer. // If detect_buffers option was specified, then the reply from the parser will be a buffer.
// If this command did not use Buffer arguments, then convert the reply to Strings here. // If this command did not use Buffer arguments, then convert the reply to Strings here.
reply = utils.reply_to_strings(reply); reply = utils.reply_to_strings(reply);
@@ -138,6 +138,7 @@ function RedisClient (options, stream) {
this.pub_sub_mode = 0; this.pub_sub_mode = 0;
this.subscription_set = {}; this.subscription_set = {};
this.monitoring = false; this.monitoring = false;
this.message_buffers = false;
this.closing = false; this.closing = false;
this.server_info = {}; this.server_info = {};
this.auth_pass = options.auth_pass || options.password; this.auth_pass = options.auth_pass || options.password;
@@ -149,23 +150,7 @@ function RedisClient (options, stream) {
this.options = options; this.options = options;
this.buffers = options.return_buffers || options.detect_buffers; this.buffers = options.return_buffers || options.detect_buffers;
// Init parser // Init parser
this.reply_parser = Parser({ this.reply_parser = create_parser(this, options);
returnReply: function (data) {
self.return_reply(data);
},
returnError: function (err) {
self.return_error(err);
},
returnFatalError: function (err) {
// Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again
self.flush_and_error(err, ['command_queue']);
self.stream.destroy();
self.return_error(err);
},
returnBuffers: this.buffers,
name: options.parser,
stringNumbers: options.string_numbers
});
this.create_stream(); this.create_stream();
// The listeners will not be attached right away, so let's print the deprecation message while the listener is attached // The listeners will not be attached right away, so let's print the deprecation message while the listener is attached
this.on('newListener', function (event) { this.on('newListener', function (event) {
@@ -179,6 +164,10 @@ function RedisClient (options, stream) {
'The drain event listener is deprecated and will be removed in v.3.0.0.\n' + 'The drain event listener is deprecated and will be removed in v.3.0.0.\n' +
'If you want to keep on listening to this event please listen to the stream drain event directly.' 'If you want to keep on listening to this event please listen to the stream drain event directly.'
); );
} else if (event === 'message_buffer' || event === 'pmessage_buffer' || event === 'messageBuffer' || event === 'pmessageBuffer' && !this.buffers) {
this.message_buffers = true;
this.handle_reply = handle_detect_buffers_reply;
this.reply_parser = create_parser(this);
} }
}); });
} }
@@ -186,6 +175,26 @@ util.inherits(RedisClient, EventEmitter);
RedisClient.connection_id = 0; RedisClient.connection_id = 0;
function create_parser (self) {
return Parser({
returnReply: function (data) {
self.return_reply(data);
},
returnError: function (err) {
self.return_error(err);
},
returnFatalError: function (err) {
// Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again
self.flush_and_error(err, ['command_queue']);
self.stream.destroy();
self.return_error(err);
},
returnBuffers: self.buffers || self.message_buffers,
name: self.options.parser,
stringNumbers: self.options.string_numbers
});
}
/****************************************************************************** /******************************************************************************
All functions in here are internal besides the RedisClient constructor All functions in here are internal besides the RedisClient constructor
@@ -696,21 +705,18 @@ function subscribe_unsubscribe (self, reply, type, subscribe) {
function return_pub_sub (self, reply) { function return_pub_sub (self, reply) {
var type = reply[0].toString(); var type = reply[0].toString();
if (type === 'message') { // channel, message if (type === 'message') { // channel, message
// TODO: Implement message_buffer if (!self.options.return_buffers || self.message_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
// if (self.buffers) {
// self.emit('message_buffer', reply[1], reply[2]);
// }
if (!self.options.return_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
self.emit('message', reply[1].toString(), reply[2].toString()); self.emit('message', reply[1].toString(), reply[2].toString());
self.emit('message_buffer', reply[1], reply[2]);
self.emit('messageBuffer', reply[1], reply[2]);
} else { } else {
self.emit('message', reply[1], reply[2]); self.emit('message', reply[1], reply[2]);
} }
} else if (type === 'pmessage') { // pattern, channel, message } else if (type === 'pmessage') { // pattern, channel, message
// if (self.buffers) { if (!self.options.return_buffers || self.message_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
// self.emit('pmessage_buffer', reply[1], reply[2], reply[3]);
// }
if (!self.options.return_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
self.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3].toString()); self.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3].toString());
self.emit('pmessage_buffer', reply[1], reply[2], reply[3]);
self.emit('pmessageBuffer', reply[1], reply[2], reply[3]);
} else { } else {
self.emit('pmessage', reply[1], reply[2], reply[3]); self.emit('pmessage', reply[1], reply[2], reply[3]);
} }

View File

@@ -504,6 +504,30 @@ describe('publish/subscribe', function () {
pub.publish('/foo', 'hello world', helper.isNumber(3)); pub.publish('/foo', 'hello world', helper.isNumber(3));
}); });
}); });
it('allows to listen to pmessageBuffer and pmessage', function (done) {
var batch = sub.batch();
batch.psubscribe('*');
batch.subscribe('/foo');
batch.unsubscribe('/foo');
batch.unsubscribe();
batch.subscribe(['/foo']);
batch.exec();
assert.strictEqual(sub.shouldBuffer, false);
sub.on('pmessageBuffer', function (pattern, channel, message) {
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
assert.strictEqual(channel.inspect(), new Buffer('/foo').inspect());
sub.quit(done);
});
sub.on('pmessage', function (pattern, channel, message) {
assert.strictEqual(pattern, '*');
assert.strictEqual(channel, '/foo');
});
pub.pubsub('numsub', '/foo', function (err, res) {
assert.deepEqual(res, ['/foo', 1]);
});
pub.publish('/foo', 'hello world', helper.isNumber(2));
});
}); });
describe('punsubscribe', function () { describe('punsubscribe', function () {