diff --git a/index.js b/index.js index cb7e48f9fd..4557edbcea 100644 --- a/index.js +++ b/index.js @@ -456,9 +456,22 @@ RedisClient.prototype.return_error = function (err) { } }; +// if a callback throws an exception, re-throw it on a new stack so the parser can keep going. +// put this try/catch in its own function because V8 doesn't optimize this well yet. +function try_callback(callback, reply) { + try { + callback(null, reply); + } catch (err) { + process.nextTick(function () { + throw err; + }); + } +} + RedisClient.prototype.return_reply = function (reply) { - var command_obj = this.command_queue.shift(), - obj, i, len, key, val, type, timestamp, argindex, args, queue_len = this.command_queue.getLength(); + var command_obj, obj, i, len, key, val, type, timestamp, argindex, args, queue_len; + + queue_len = this.command_queue.getLength(); if (this.subscriptions === false && queue_len === 0) { this.emit("idle"); @@ -469,6 +482,8 @@ RedisClient.prototype.return_reply = function (reply) { this.should_buffer = false; } + command_obj = this.command_queue.shift(); + if (command_obj && !command_obj.sub_command) { if (typeof command_obj.callback === "function") { // HGETALL special case replies with keyed Buffers @@ -482,14 +497,7 @@ RedisClient.prototype.return_reply = function (reply) { reply = obj; } - try { - command_obj.callback(null, reply); - } catch (err) { - // if a callback throws an exception, re-throw it on a new stack so the parser can keep going - process.nextTick(function () { - throw err; - }); - } + try_callback(command_obj.callback, reply); } else if (exports.debug_mode) { console.log("no callback for reply: " + (reply && reply.toString && reply.toString())); } @@ -508,6 +516,11 @@ RedisClient.prototype.return_reply = function (reply) { console.log("All subscriptions removed, exiting pub/sub mode"); } } + // subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback + // TODO - document this + if (command_obj && typeof command_obj.callback === "function") { + try_callback(command_obj.callback, reply[1].toString()); + } this.emit(type, reply[1].toString(), reply[2]); // channel, count } else { throw new Error("subscriptions are active but got unknown reply type " + type); diff --git a/test.js b/test.js index 7ab96373b3..fbc1fed078 100644 --- a/test.js +++ b/test.js @@ -469,7 +469,12 @@ tests.SUBSCRIBE = function () { }); client1.set("did a thing", 1, require_string("OK", name)); - client1.subscribe("chan1", "chan2"); + client1.subscribe("chan1", "chan2", function (err, results) { + assert.strictEqual(null, err, "result sent back unexpected error: " + err); + assert.strictEqual(2, results.length, name); + assert.strictEqual("chan1", results[0].toString(), name); + assert.strictEqual(1, results[1], name); + }); }; tests.SUBSCRIBE_QUIT = function () {