From 837cec36b642cf6ca5eace93bb59378d1c3e477e Mon Sep 17 00:00:00 2001 From: Tom Leach Date: Mon, 31 Dec 2012 14:43:48 -0500 Subject: [PATCH] Detect is an incoming "reply" is in fact a pubsub message. If so, do not pop the command queue. This fixes an issue where the command queue gets popped prematurely by pubsub messages, leading to callbacks for those commands not being invoked. Close #360. Signed-off-by: DTrejo --- index.js | 14 ++++++++++++-- test.js | 38 +++++++++++++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/index.js b/index.js index b6a0181952..bb224a5d31 100644 --- a/index.js +++ b/index.js @@ -562,8 +562,18 @@ function reply_to_strings(reply) { RedisClient.prototype.return_reply = function (reply) { var command_obj, len, type, timestamp, argindex, args, queue_len; - command_obj = this.command_queue.shift(), - queue_len = this.command_queue.getLength(); + // If the "reply" here is actually a message received asynchronously due to a + // pubsub subscription, don't pop the command queue as we'll only be consuming + // the head command prematurely. + if (Array.isArray(reply) && reply.length > 0 && reply[0]) { + type = reply[0].toString(); + } + + if (type !== 'message' && type !== 'pmessage') { + command_obj = this.command_queue.shift(); + } + + queue_len = this.command_queue.getLength(); if (this.pub_sub_mode === false && queue_len === 0) { this.emit("idle"); diff --git a/test.js b/test.js index ba801b56c7..5967ed329a 100644 --- a/test.js +++ b/test.js @@ -16,6 +16,7 @@ var redis = require("./index"), ended = false, next, cur_start, run_next_test, all_tests, all_start, test_count; + // Set this to truthy to see the wire protocol and other debugging info redis.debug_mode = process.argv[2]; @@ -94,6 +95,18 @@ function last(name, fn) { }; } +// Wraps the given callback in a timeout. If the returned function +// is not called within the timeout period, we fail the named test. +function with_timeout(name, cb, millis) { + var timeoutId = setTimeout(function() { + assert.fail("Callback timed out!", name); + }, millis); + return function() { + clearTimeout(timeoutId); + cb.apply(this, arguments); + }; +} + next = function next(name) { console.log(" \x1b[33m" + (Date.now() - cur_start) + "\x1b[0m ms"); run_next_test(); @@ -720,10 +733,33 @@ tests.SUB_UNSUB_SUB = function () { client3.on('message', function (channel, message) { assert.strictEqual(channel, 'chan3'); assert.strictEqual(message, 'foo'); + client3.removeAllListeners(); next(name); }); }; +tests.SUB_UNSUB_MSG_SUB = function () { + var name = "SUB_UNSUB_MSG_SUB"; + client3.subscribe('chan8'); + client3.subscribe('chan9'); + client3.unsubscribe('chan9'); + client2.publish('chan8', 'something'); + client3.subscribe('chan9', with_timeout(name, function (err, results) { + next(name); + }, 2000)); +}; + +tests.PSUB_UNSUB_PMSG_SUB = function () { + var name = "PSUB_UNSUB_PMSG_SUB"; + client3.psubscribe('abc*'); + client3.subscribe('xyz'); + client3.unsubscribe('xyz'); + client2.publish('abcd', 'something'); + client3.subscribe('xyz', with_timeout(name, function (err, results) { + next(name); + }, 2000)); +}; + tests.SUBSCRIBE_QUIT = function () { var name = "SUBSCRIBE_QUIT"; client3.on("end", function () { @@ -764,7 +800,7 @@ tests.SUBSCRIBE_CLOSE_RESUBSCRIBE = function () { c2.quit(); assert.fail("test failed"); } - }) + }); c1.subscribe("chan1", "chan2");