1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Detect is an incoming "reply" is in fact a pubsub message. If so, do not pop the command queue.

This fixes an issue where the command queue gets popped prematurely by pubsub
messages, leading to callbacks for those commands not being invoked.

Close #360.

Signed-off-by: DTrejo <david.daniel.trejo@gmail.com>
This commit is contained in:
Tom Leach
2012-12-31 14:43:48 -05:00
committed by DTrejo
parent f0ae6642f9
commit 837cec36b6
2 changed files with 49 additions and 3 deletions

View File

@@ -562,8 +562,18 @@ function reply_to_strings(reply) {
RedisClient.prototype.return_reply = function (reply) {
var command_obj, len, type, timestamp, argindex, args, queue_len;
command_obj = this.command_queue.shift(),
queue_len = this.command_queue.getLength();
// If the "reply" here is actually a message received asynchronously due to a
// pubsub subscription, don't pop the command queue as we'll only be consuming
// the head command prematurely.
if (Array.isArray(reply) && reply.length > 0 && reply[0]) {
type = reply[0].toString();
}
if (type !== 'message' && type !== 'pmessage') {
command_obj = this.command_queue.shift();
}
queue_len = this.command_queue.getLength();
if (this.pub_sub_mode === false && queue_len === 0) {
this.emit("idle");

38
test.js
View File

@@ -16,6 +16,7 @@ var redis = require("./index"),
ended = false,
next, cur_start, run_next_test, all_tests, all_start, test_count;
// Set this to truthy to see the wire protocol and other debugging info
redis.debug_mode = process.argv[2];
@@ -94,6 +95,18 @@ function last(name, fn) {
};
}
// Wraps the given callback in a timeout. If the returned function
// is not called within the timeout period, we fail the named test.
function with_timeout(name, cb, millis) {
var timeoutId = setTimeout(function() {
assert.fail("Callback timed out!", name);
}, millis);
return function() {
clearTimeout(timeoutId);
cb.apply(this, arguments);
};
}
next = function next(name) {
console.log(" \x1b[33m" + (Date.now() - cur_start) + "\x1b[0m ms");
run_next_test();
@@ -720,10 +733,33 @@ tests.SUB_UNSUB_SUB = function () {
client3.on('message', function (channel, message) {
assert.strictEqual(channel, 'chan3');
assert.strictEqual(message, 'foo');
client3.removeAllListeners();
next(name);
});
};
tests.SUB_UNSUB_MSG_SUB = function () {
var name = "SUB_UNSUB_MSG_SUB";
client3.subscribe('chan8');
client3.subscribe('chan9');
client3.unsubscribe('chan9');
client2.publish('chan8', 'something');
client3.subscribe('chan9', with_timeout(name, function (err, results) {
next(name);
}, 2000));
};
tests.PSUB_UNSUB_PMSG_SUB = function () {
var name = "PSUB_UNSUB_PMSG_SUB";
client3.psubscribe('abc*');
client3.subscribe('xyz');
client3.unsubscribe('xyz');
client2.publish('abcd', 'something');
client3.subscribe('xyz', with_timeout(name, function (err, results) {
next(name);
}, 2000));
};
tests.SUBSCRIBE_QUIT = function () {
var name = "SUBSCRIBE_QUIT";
client3.on("end", function () {
@@ -764,7 +800,7 @@ tests.SUBSCRIBE_CLOSE_RESUBSCRIBE = function () {
c2.quit();
assert.fail("test failed");
}
})
});
c1.subscribe("chan1", "chan2");