diff --git a/index.js b/index.js index 99438d4b9b..9ee001e610 100644 --- a/index.js +++ b/index.js @@ -148,6 +148,7 @@ function RedisClient (options, stream) { this.old_state = null; this.fire_strings = true; // Determine if strings or buffers should be written to the stream this.pipeline = false; + this.sub_commands_left = 0; this.times_connected = 0; this.options = options; this.buffers = options.return_buffers || options.detect_buffers; @@ -645,6 +646,11 @@ RedisClient.prototype.return_error = function (err) { } } + // Count down pub sub mode if in entering modus + if (this.pub_sub_mode > 1) { + this.pub_sub_mode--; + } + var match = err.message.match(utils.err_code); // LUA script could return user errors that don't behave like all other errors! if (match) { @@ -677,50 +683,39 @@ function normal_reply (self, reply) { } } -function set_subscribe (self, type, subscribe, channel) { - // Every channel has to be saved / removed one after the other and the type has to be the same too, - // to make sure partly subscribe / unsubscribe works well together - if (subscribe) { - self.subscription_set[type + '_' + channel] = channel; - } else { - type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent - delete self.subscription_set[type + '_' + channel]; - } -} - -function subscribe_unsubscribe (self, reply, type, subscribe) { +function subscribe_unsubscribe (self, reply, type) { // Subscribe commands take an optional callback and also emit an event, but only the _last_ response is included in the callback // The pub sub commands return each argument in a separate return value and have to be handled that way var command_obj = self.command_queue.get(0); var buffer = self.options.return_buffers || self.options.detect_buffers && command_obj.buffer_args; var channel = (buffer || reply[1] === null) ? reply[1] : reply[1].toString(); var count = +reply[2]; // Return the channel counter as number no matter if `string_numbers` is activated or not - debug('Subscribe / unsubscribe command'); + debug(type, channel); // Emit first, then return the callback if (channel !== null) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from self.emit(type, channel, count); - set_subscribe(self, type, subscribe, channel); - } - if (command_obj.sub_commands_left <= 1) { - if (count !== 0) { - if (!subscribe && command_obj.args.length === 0) { // Unsubscribe from all channels - command_obj.sub_commands_left = count; - return; - } + if (type === 'subscribe' || type === 'psubscribe') { + self.subscription_set[type + '_' + channel] = channel; } else { + type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent + delete self.subscription_set[type + '_' + channel]; + } + } + + if (command_obj.args.length === 1 || self.sub_commands_left === 1 || command_obj.args.length === 0 && (count === 0 || channel === null)) { + if (count === 0) { // unsubscribed from all channels var running_command; var i = 1; + self.pub_sub_mode = 0; // Deactivating pub sub mode // This should be a rare case and therefore handling it this way should be good performance wise for the general case while (running_command = self.command_queue.get(i)) { if (SUBSCRIBE_COMMANDS[running_command.command]) { - self.command_queue.shift(); - self.pub_sub_mode = i; - return; + self.pub_sub_mode = i; // Entering pub sub mode again + break; } i++; } - self.pub_sub_mode = 0; } self.command_queue.shift(); if (typeof command_obj.callback === 'function') { @@ -728,8 +723,13 @@ function subscribe_unsubscribe (self, reply, type, subscribe) { // Evaluate to change this in v.3 to return all subscribed / unsubscribed channels in an array including the number of channels subscribed too command_obj.callback(null, channel); } + self.sub_commands_left = 0; } else { - command_obj.sub_commands_left--; + if (self.sub_commands_left !== 0) { + self.sub_commands_left--; + } else { + self.sub_commands_left = command_obj.args.length ? command_obj.args.length - 1 : count; + } } } @@ -751,12 +751,8 @@ function return_pub_sub (self, reply) { } else { self.emit('pmessage', reply[1], reply[2], reply[3]); } - } else if (type === 'subscribe' || type === 'psubscribe') { - subscribe_unsubscribe(self, reply, type, true); - } else if (type === 'unsubscribe' || type === 'punsubscribe') { - subscribe_unsubscribe(self, reply, type, false); } else { - normal_reply(self, reply); + subscribe_unsubscribe(self, reply, type); } } @@ -787,10 +783,12 @@ RedisClient.prototype.return_reply = function (reply) { } else if (this.pub_sub_mode !== 1) { this.pub_sub_mode--; normal_reply(this, reply); - } else if (reply instanceof Array && reply.length > 2 && reply[0]) { - return_pub_sub(this, reply); - } else { + } else if (!(reply instanceof Array) || reply.length <= 2) { + // Only PING and QUIT are allowed in this context besides the pub sub commands + // Ping replies with ['pong', null|value] and quit with 'OK' normal_reply(this, reply); + } else { + return_pub_sub(this, reply); } }; diff --git a/lib/command.js b/lib/command.js index ee1181ea7e..e63d338b5e 100644 --- a/lib/command.js +++ b/lib/command.js @@ -4,10 +4,9 @@ // a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots. function Command (command, args, buffer_args, callback) { this.command = command; - this.args = args; // We only need the args for the offline commands => move them into another class. We need the number of args though for pub sub + this.args = args; this.buffer_args = buffer_args; this.callback = callback; - this.sub_commands_left = args.length; } function OfflineCommand (command, args, callback, call_on_write) { diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index f9401f63ad..ffce23f442 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -79,6 +79,7 @@ describe('publish/subscribe', function () { it('does not fire subscribe events after reconnecting', function (done) { var i = 0; + var end = helper.callFuncAfter(done, 2); sub.on('subscribe', function (chnl, count) { assert.strictEqual(typeof count, 'number'); assert.strictEqual(++i, count); @@ -91,9 +92,10 @@ describe('publish/subscribe', function () { sub.unsubscribe(function (err, res) { // Do not pass a channel here! assert.strictEqual(sub.pub_sub_mode, 2); assert.deepEqual(sub.subscription_set, {}); + end(); }); sub.set('foo', 'bar', helper.isString('OK')); - sub.subscribe(channel2, done); + sub.subscribe(channel2, end); }); }); @@ -181,25 +183,19 @@ describe('publish/subscribe', function () { sub.subscribe('chan9'); sub.unsubscribe('chan9'); pub.publish('chan8', 'something'); - sub.subscribe('chan9', function () { - return done(); - }); + sub.subscribe('chan9', done); }); it('handles SUB_UNSUB_MSG_SUB 2', function (done) { - sub.psubscribe('abc*'); + sub.psubscribe('abc*', helper.isString('abc*')); sub.subscribe('xyz'); sub.unsubscribe('xyz'); pub.publish('abcd', 'something'); - sub.subscribe('xyz', function () { - return done(); - }); + sub.subscribe('xyz', done); }); it('emits end event if quit is called from within subscribe', function (done) { - sub.on('end', function () { - return done(); - }); + sub.on('end', done); sub.on('subscribe', function (chnl, count) { sub.quit(); }); @@ -236,6 +232,10 @@ describe('publish/subscribe', function () { var end = helper.callFuncAfter(done, 2); sub.select(3); sub.set('foo', 'bar'); + sub.set('failure', helper.isError()); // Triggering a warning while subscribing should work + sub.mget('foo', 'bar', 'baz', 'hello', 'world', function (err, res) { + assert.deepEqual(res, ['bar', null, null, null, null]); + }); sub.subscribe('somechannel', 'another channel', function (err, res) { end(); sub.stream.destroy(); @@ -280,7 +280,7 @@ describe('publish/subscribe', function () { it('should only resubscribe to channels not unsubscribed earlier on a reconnect', function (done) { sub.subscribe('/foo', '/bar'); - sub.unsubscribe('/bar', function () { + sub.batch().unsubscribe(['/bar'], function () { pub.pubsub('channels', function (err, res) { assert.deepEqual(res, ['/foo']); sub.stream.destroy(); @@ -291,7 +291,7 @@ describe('publish/subscribe', function () { }); }); }); - }); + }).exec(); }); it('unsubscribes, subscribes, unsubscribes... single and multiple entries mixed. Withouth callbacks', function (done) { @@ -490,7 +490,7 @@ describe('publish/subscribe', function () { return_buffers: true }); sub2.on('ready', function () { - sub2.psubscribe('*'); + sub2.batch().psubscribe('*', helper.isString('*')).exec(); sub2.subscribe('/foo'); sub2.on('pmessage', function (pattern, channel, message) { assert.strictEqual(pattern.inspect(), new Buffer('*').inspect()); @@ -501,32 +501,58 @@ describe('publish/subscribe', function () { pub.pubsub('numsub', '/foo', function (err, res) { assert.deepEqual(res, ['/foo', 2]); }); + // sub2 is counted twice as it subscribed with psubscribe and subscribe pub.publish('/foo', 'hello world', helper.isNumber(3)); }); }); it('allows to listen to pmessageBuffer and pmessage', function (done) { var batch = sub.batch(); + var end = helper.callFuncAfter(done, 6); + assert.strictEqual(sub.message_buffers, false); batch.psubscribe('*'); batch.subscribe('/foo'); batch.unsubscribe('/foo'); - batch.unsubscribe(); - batch.subscribe(['/foo']); + batch.unsubscribe(helper.isNull()); + batch.subscribe(['/foo'], helper.isString('/foo')); batch.exec(); assert.strictEqual(sub.shouldBuffer, false); sub.on('pmessageBuffer', function (pattern, channel, message) { assert.strictEqual(pattern.inspect(), new Buffer('*').inspect()); assert.strictEqual(channel.inspect(), new Buffer('/foo').inspect()); - sub.quit(done); + sub.quit(end); }); + // Either message_buffers or buffers has to be true, but not both at the same time + assert.notStrictEqual(sub.message_buffers, sub.buffers); sub.on('pmessage', function (pattern, channel, message) { assert.strictEqual(pattern, '*'); assert.strictEqual(channel, '/foo'); + assert.strictEqual(message, 'hello world'); + end(); }); - pub.pubsub('numsub', '/foo', function (err, res) { - assert.deepEqual(res, ['/foo', 1]); + sub.on('message', function (channel, message) { + assert.strictEqual(channel, '/foo'); + assert.strictEqual(message, 'hello world'); + end(); }); - pub.publish('/foo', 'hello world', helper.isNumber(2)); + setTimeout(function () { + pub.pubsub('numsub', '/foo', function (err, res) { + // There's one subscriber to this channel + assert.deepEqual(res, ['/foo', 1]); + end(); + }); + pub.pubsub('channels', function (err, res) { + // There's exactly one channel that is listened too + assert.deepEqual(res, ['/foo']); + end(); + }); + pub.pubsub('numpat', function (err, res) { + // One pattern is active + assert.strictEqual(res, 1); + end(); + }); + pub.publish('/foo', 'hello world', helper.isNumber(2)); + }, 50); }); }); @@ -536,10 +562,7 @@ describe('publish/subscribe', function () { }); it('executes callback when punsubscribe is called and there are no subscriptions', function (done) { - pub.punsubscribe(function (err, results) { - assert.strictEqual(null, results); - done(err); - }); + pub.batch().punsubscribe(helper.isNull()).exec(done); }); });