From 03fc7c203e72db271b805687fce0f42bcbd5826a Mon Sep 17 00:00:00 2001 From: Matt Ranney Date: Sun, 19 Sep 2010 23:46:43 -0700 Subject: [PATCH] Add pub/sub support. --- index.js | 98 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 27 deletions(-) diff --git a/index.js b/index.js index 7b143af417..d542b9792a 100644 --- a/index.js +++ b/index.js @@ -28,10 +28,11 @@ function small_toString(buf) { return tmp; } -function toArray(args) { +function to_array(args) { var len = args.length, - arr = new Array(len); - for (var i = 0; i < len; ++i) { + arr = new Array(len), i; + + for (i = 0; i < len; i += 1) { arr[i] = args[i]; } return arr; @@ -257,7 +258,7 @@ RedisReplyParser.prototype.add_multi_bulk_reply = function (reply) { // http://github.com/creationix/pattern/blob/master/lib/pattern/queue.js var Queue = function () { this.tail = []; - this.head = toArray(arguments); + this.head = to_array(arguments); this.offset = 0; }; @@ -303,6 +304,7 @@ function RedisClient(stream) { this.commands_sent = 0; this.retry_delay = 250; this.retry_backoff = 1.7; + this.subscriptions = false; var self = this; @@ -422,38 +424,65 @@ RedisClient.prototype.return_error = function (err) { if (command_obj && typeof command_obj.callback === "function") { command_obj.callback(err); } else { - console.log("no callback to send error: " + err.stack); + console.log("no callback to send error: " + sys.inspect(err)); // this will probably not make it anywhere useful, but we might as well throw - throw err; + throw new Error(err); } }; -RedisClient.prototype.return_reply = function (reply_buffer) { +RedisClient.prototype.return_reply = function (reply) { var command_obj = this.command_queue.shift(); - if (command_obj && typeof command_obj.callback === "function") { - // HGETALL special case replies with keyed Buffers - if ('HGETALL' == command_obj.command) { - var obj = {}; - for (var i = 0, len = reply_buffer.length; i < len; ++i) { - var key = reply_buffer[i].toString(), - val = reply_buffer[++i]; - obj[key] = val; + if (command_obj) { + if (typeof command_obj.callback === "function") { + // HGETALL special case replies with keyed Buffers + if ('HGETALL' === command_obj.command) { + var obj = {}; + for (var i = 0, len = reply.length; i < len; i += 2) { + var key = reply[i].toString(), + val = reply[i + 1]; + obj[key] = val; + } + reply = obj; } - reply_buffer = obj; + + command_obj.callback(null, reply); + } else if (exports.debug_mode) { + console.log("no callback for reply: " + reply.toString()); } - command_obj.callback(null, reply_buffer); - } else { - if (exports.debug_mode) { - console.log("no callback for reply: " + reply_buffer.toString()); + } else if (this.subscriptions) { + if (Array.isArray(reply)) { + var type = reply[0].toString(); + + if (type === "message") { + this.emit("message", reply[1], reply[2]); // channel, message + } else if (type === "pmessage") { + this.emit("pmessage", reply[1], reply[2], reply[3]); // pattern, channel, message + } else if (type === "subscribe" || type === "unsubscribe" || type === "psubscribe" || type === "punsubscribe") { + if (reply[2] === 0) { + this.subscriptions = false; + if (this.debug_mode) { + console.log("All subscriptions removed, exiting pub/sub mode"); + } + } + this.emit(type, reply[1], reply[2]); // channel, count + } else { + throw new Error("subscriptions are active but got unknown reply type " + type); + } + } else { + throw new Error("subscriptions are active but got an invalid reply: " + reply); } } }; RedisClient.prototype.send_command = function () { - var command, callback, args, this_args, command_obj; + var command, callback, args, this_args, command_obj, self = this; - this_args = toArray(arguments); // convert arguments into real array + this_args = to_array(arguments); + + if (this_args.length === 0) { + throw new Error("send_command: not enough arguments"); + } command = this_args[0]; if (this_args[1] && Array.isArray(this_args[1])) { @@ -488,7 +517,17 @@ RedisClient.prototype.send_command = function () { return; } - this.command_queue.push(command_obj); + if (command === "SUBSCRIBE" || command === "PSUBSCRIBE" || command === "UNSUBSCRIBE" || command === "PUNSUBSCRIBE") { + if (this.subscriptions === false && exports.debug_mode) { + console.log("Entering pub/sub mode from " + command); + } + this.subscriptions = true; + } else { + if (this.subscriptions === true) { + throw new Error("Connection in pub/sub mode, only pub/sub commands may be used"); + } + this.command_queue.push(command_obj); + } this.commands_sent += 1; var elem_count = 1, stream = this.stream, buffer_args = false, command_str = ""; @@ -499,6 +538,7 @@ RedisClient.prototype.send_command = function () { }); // Always use "Multi bulk commands", but if passed any Buffer args, then do multiple writes, one for each arg + // This means that using Buffers in commands is going to be slower, so use Strings if you don't need binary. command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n"; @@ -573,25 +613,26 @@ exports.commands = [ "SAVE", "BGSAVE", "LASTSAVE", "SHUTDOWN", "BGREWRITEAOF", // Remote server control commands "INFO", "MONITOR", "SLAVEOF", "CONFIG", + // Publish/Subscribe + "PUBLISH", "SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE", // Undocumented commands - "PING" + "PING", ]; exports.commands.forEach(function (command) { RedisClient.prototype[command] = function () { - var args = toArray(arguments); // convert "arguments" into a real Array + var args = to_array(arguments); args.unshift(command); // put command at the beginning this.send_command.apply(this, args); }; RedisClient.prototype[command.toLowerCase()] = function (args, callback) { - var args = toArray(arguments); // convert "arguments" into a real Array + var args = to_array(arguments); args.unshift(command); // put command at the beginning this.send_command.apply(this, args); }; }); // Transactions - "DISCARD", "WATCH", "UNWATCH", -// Publish/Subscribe - "SUBSCRIBE", "UNSUBSCRIBE", "PUBLISH", RedisClient.prototype.multi = function (commands) { var self = this; @@ -625,6 +666,9 @@ RedisClient.prototype.multi = function (commands) { }); }); }; +RedisClient.prototype.MULTI = function (commands) { + return this.multi(commands); +}; exports.createClient = function (port_arg, host_arg, options) { var port = port_arg || default_port,