diff --git a/index.js b/index.js index b6a0181952..bb224a5d31 100644 --- a/index.js +++ b/index.js @@ -562,8 +562,18 @@ function reply_to_strings(reply) { RedisClient.prototype.return_reply = function (reply) { var command_obj, len, type, timestamp, argindex, args, queue_len; - command_obj = this.command_queue.shift(), - queue_len = this.command_queue.getLength(); + // If the "reply" here is actually a message received asynchronously due to a + // pubsub subscription, don't pop the command queue as we'll only be consuming + // the head command prematurely. + if (Array.isArray(reply) && reply.length > 0 && reply[0]) { + type = reply[0].toString(); + } + + if (type !== 'message' && type !== 'pmessage') { + command_obj = this.command_queue.shift(); + } + + queue_len = this.command_queue.getLength(); if (this.pub_sub_mode === false && queue_len === 0) { this.emit("idle"); diff --git a/test.js b/test.js index ba801b56c7..5967ed329a 100644 --- a/test.js +++ b/test.js @@ -16,6 +16,7 @@ var redis = require("./index"), ended = false, next, cur_start, run_next_test, all_tests, all_start, test_count; + // Set this to truthy to see the wire protocol and other debugging info redis.debug_mode = process.argv[2]; @@ -94,6 +95,18 @@ function last(name, fn) { }; } +// Wraps the given callback in a timeout. If the returned function +// is not called within the timeout period, we fail the named test. +function with_timeout(name, cb, millis) { + var timeoutId = setTimeout(function() { + assert.fail("Callback timed out!", name); + }, millis); + return function() { + clearTimeout(timeoutId); + cb.apply(this, arguments); + }; +} + next = function next(name) { console.log(" \x1b[33m" + (Date.now() - cur_start) + "\x1b[0m ms"); run_next_test(); @@ -720,10 +733,33 @@ tests.SUB_UNSUB_SUB = function () { client3.on('message', function (channel, message) { assert.strictEqual(channel, 'chan3'); assert.strictEqual(message, 'foo'); + client3.removeAllListeners(); next(name); }); }; +tests.SUB_UNSUB_MSG_SUB = function () { + var name = "SUB_UNSUB_MSG_SUB"; + client3.subscribe('chan8'); + client3.subscribe('chan9'); + client3.unsubscribe('chan9'); + client2.publish('chan8', 'something'); + client3.subscribe('chan9', with_timeout(name, function (err, results) { + next(name); + }, 2000)); +}; + +tests.PSUB_UNSUB_PMSG_SUB = function () { + var name = "PSUB_UNSUB_PMSG_SUB"; + client3.psubscribe('abc*'); + client3.subscribe('xyz'); + client3.unsubscribe('xyz'); + client2.publish('abcd', 'something'); + client3.subscribe('xyz', with_timeout(name, function (err, results) { + next(name); + }, 2000)); +}; + tests.SUBSCRIBE_QUIT = function () { var name = "SUBSCRIBE_QUIT"; client3.on("end", function () { @@ -764,7 +800,7 @@ tests.SUBSCRIBE_CLOSE_RESUBSCRIBE = function () { c2.quit(); assert.fail("test failed"); } - }) + }); c1.subscribe("chan1", "chan2");