You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
Subscribe commands take optional callback. Fixes [GH-140].
This commit is contained in:
33
index.js
33
index.js
@@ -456,9 +456,22 @@ RedisClient.prototype.return_error = function (err) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// if a callback throws an exception, re-throw it on a new stack so the parser can keep going.
|
||||||
|
// put this try/catch in its own function because V8 doesn't optimize this well yet.
|
||||||
|
function try_callback(callback, reply) {
|
||||||
|
try {
|
||||||
|
callback(null, reply);
|
||||||
|
} catch (err) {
|
||||||
|
process.nextTick(function () {
|
||||||
|
throw err;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
RedisClient.prototype.return_reply = function (reply) {
|
RedisClient.prototype.return_reply = function (reply) {
|
||||||
var command_obj = this.command_queue.shift(),
|
var command_obj, obj, i, len, key, val, type, timestamp, argindex, args, queue_len;
|
||||||
obj, i, len, key, val, type, timestamp, argindex, args, queue_len = this.command_queue.getLength();
|
|
||||||
|
queue_len = this.command_queue.getLength();
|
||||||
|
|
||||||
if (this.subscriptions === false && queue_len === 0) {
|
if (this.subscriptions === false && queue_len === 0) {
|
||||||
this.emit("idle");
|
this.emit("idle");
|
||||||
@@ -469,6 +482,8 @@ RedisClient.prototype.return_reply = function (reply) {
|
|||||||
this.should_buffer = false;
|
this.should_buffer = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
command_obj = this.command_queue.shift();
|
||||||
|
|
||||||
if (command_obj && !command_obj.sub_command) {
|
if (command_obj && !command_obj.sub_command) {
|
||||||
if (typeof command_obj.callback === "function") {
|
if (typeof command_obj.callback === "function") {
|
||||||
// HGETALL special case replies with keyed Buffers
|
// HGETALL special case replies with keyed Buffers
|
||||||
@@ -482,14 +497,7 @@ RedisClient.prototype.return_reply = function (reply) {
|
|||||||
reply = obj;
|
reply = obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try_callback(command_obj.callback, reply);
|
||||||
command_obj.callback(null, reply);
|
|
||||||
} catch (err) {
|
|
||||||
// if a callback throws an exception, re-throw it on a new stack so the parser can keep going
|
|
||||||
process.nextTick(function () {
|
|
||||||
throw err;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} else if (exports.debug_mode) {
|
} else if (exports.debug_mode) {
|
||||||
console.log("no callback for reply: " + (reply && reply.toString && reply.toString()));
|
console.log("no callback for reply: " + (reply && reply.toString && reply.toString()));
|
||||||
}
|
}
|
||||||
@@ -508,6 +516,11 @@ RedisClient.prototype.return_reply = function (reply) {
|
|||||||
console.log("All subscriptions removed, exiting pub/sub mode");
|
console.log("All subscriptions removed, exiting pub/sub mode");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback
|
||||||
|
// TODO - document this
|
||||||
|
if (command_obj && typeof command_obj.callback === "function") {
|
||||||
|
try_callback(command_obj.callback, reply[1].toString());
|
||||||
|
}
|
||||||
this.emit(type, reply[1].toString(), reply[2]); // channel, count
|
this.emit(type, reply[1].toString(), reply[2]); // channel, count
|
||||||
} else {
|
} else {
|
||||||
throw new Error("subscriptions are active but got unknown reply type " + type);
|
throw new Error("subscriptions are active but got unknown reply type " + type);
|
||||||
|
7
test.js
7
test.js
@@ -469,7 +469,12 @@ tests.SUBSCRIBE = function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
client1.set("did a thing", 1, require_string("OK", name));
|
client1.set("did a thing", 1, require_string("OK", name));
|
||||||
client1.subscribe("chan1", "chan2");
|
client1.subscribe("chan1", "chan2", function (err, results) {
|
||||||
|
assert.strictEqual(null, err, "result sent back unexpected error: " + err);
|
||||||
|
assert.strictEqual(2, results.length, name);
|
||||||
|
assert.strictEqual("chan1", results[0].toString(), name);
|
||||||
|
assert.strictEqual(1, results[1], name);
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
tests.SUBSCRIBE_QUIT = function () {
|
tests.SUBSCRIBE_QUIT = function () {
|
||||||
|
Reference in New Issue
Block a user