1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Add pub/sub support.

This commit is contained in:
Matt Ranney
2010-09-19 23:46:43 -07:00
parent eb11634ab6
commit 03fc7c203e

View File

@@ -28,10 +28,11 @@ function small_toString(buf) {
return tmp;
}
function toArray(args) {
function to_array(args) {
var len = args.length,
arr = new Array(len);
for (var i = 0; i < len; ++i) {
arr = new Array(len), i;
for (i = 0; i < len; i += 1) {
arr[i] = args[i];
}
return arr;
@@ -257,7 +258,7 @@ RedisReplyParser.prototype.add_multi_bulk_reply = function (reply) {
// http://github.com/creationix/pattern/blob/master/lib/pattern/queue.js
var Queue = function () {
this.tail = [];
this.head = toArray(arguments);
this.head = to_array(arguments);
this.offset = 0;
};
@@ -303,6 +304,7 @@ function RedisClient(stream) {
this.commands_sent = 0;
this.retry_delay = 250;
this.retry_backoff = 1.7;
this.subscriptions = false;
var self = this;
@@ -422,38 +424,65 @@ RedisClient.prototype.return_error = function (err) {
if (command_obj && typeof command_obj.callback === "function") {
command_obj.callback(err);
} else {
console.log("no callback to send error: " + err.stack);
console.log("no callback to send error: " + sys.inspect(err));
// this will probably not make it anywhere useful, but we might as well throw
throw err;
throw new Error(err);
}
};
RedisClient.prototype.return_reply = function (reply_buffer) {
RedisClient.prototype.return_reply = function (reply) {
var command_obj = this.command_queue.shift();
if (command_obj && typeof command_obj.callback === "function") {
// HGETALL special case replies with keyed Buffers
if ('HGETALL' == command_obj.command) {
var obj = {};
for (var i = 0, len = reply_buffer.length; i < len; ++i) {
var key = reply_buffer[i].toString(),
val = reply_buffer[++i];
obj[key] = val;
if (command_obj) {
if (typeof command_obj.callback === "function") {
// HGETALL special case replies with keyed Buffers
if ('HGETALL' === command_obj.command) {
var obj = {};
for (var i = 0, len = reply.length; i < len; i += 2) {
var key = reply[i].toString(),
val = reply[i + 1];
obj[key] = val;
}
reply = obj;
}
reply_buffer = obj;
command_obj.callback(null, reply);
} else if (exports.debug_mode) {
console.log("no callback for reply: " + reply.toString());
}
command_obj.callback(null, reply_buffer);
} else {
if (exports.debug_mode) {
console.log("no callback for reply: " + reply_buffer.toString());
} else if (this.subscriptions) {
if (Array.isArray(reply)) {
var type = reply[0].toString();
if (type === "message") {
this.emit("message", reply[1], reply[2]); // channel, message
} else if (type === "pmessage") {
this.emit("pmessage", reply[1], reply[2], reply[3]); // pattern, channel, message
} else if (type === "subscribe" || type === "unsubscribe" || type === "psubscribe" || type === "punsubscribe") {
if (reply[2] === 0) {
this.subscriptions = false;
if (this.debug_mode) {
console.log("All subscriptions removed, exiting pub/sub mode");
}
}
this.emit(type, reply[1], reply[2]); // channel, count
} else {
throw new Error("subscriptions are active but got unknown reply type " + type);
}
} else {
throw new Error("subscriptions are active but got an invalid reply: " + reply);
}
}
};
RedisClient.prototype.send_command = function () {
var command, callback, args, this_args, command_obj;
var command, callback, args, this_args, command_obj, self = this;
this_args = toArray(arguments); // convert arguments into real array
this_args = to_array(arguments);
if (this_args.length === 0) {
throw new Error("send_command: not enough arguments");
}
command = this_args[0];
if (this_args[1] && Array.isArray(this_args[1])) {
@@ -488,7 +517,17 @@ RedisClient.prototype.send_command = function () {
return;
}
this.command_queue.push(command_obj);
if (command === "SUBSCRIBE" || command === "PSUBSCRIBE" || command === "UNSUBSCRIBE" || command === "PUNSUBSCRIBE") {
if (this.subscriptions === false && exports.debug_mode) {
console.log("Entering pub/sub mode from " + command);
}
this.subscriptions = true;
} else {
if (this.subscriptions === true) {
throw new Error("Connection in pub/sub mode, only pub/sub commands may be used");
}
this.command_queue.push(command_obj);
}
this.commands_sent += 1;
var elem_count = 1, stream = this.stream, buffer_args = false, command_str = "";
@@ -499,6 +538,7 @@ RedisClient.prototype.send_command = function () {
});
// Always use "Multi bulk commands", but if passed any Buffer args, then do multiple writes, one for each arg
// This means that using Buffers in commands is going to be slower, so use Strings if you don't need binary.
command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n";
@@ -573,25 +613,26 @@ exports.commands = [
"SAVE", "BGSAVE", "LASTSAVE", "SHUTDOWN", "BGREWRITEAOF",
// Remote server control commands
"INFO", "MONITOR", "SLAVEOF", "CONFIG",
// Publish/Subscribe
"PUBLISH", "SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE",
// Undocumented commands
"PING"
"PING",
];
exports.commands.forEach(function (command) {
RedisClient.prototype[command] = function () {
var args = toArray(arguments); // convert "arguments" into a real Array
var args = to_array(arguments);
args.unshift(command); // put command at the beginning
this.send_command.apply(this, args);
};
RedisClient.prototype[command.toLowerCase()] = function (args, callback) {
var args = toArray(arguments); // convert "arguments" into a real Array
var args = to_array(arguments);
args.unshift(command); // put command at the beginning
this.send_command.apply(this, args);
};
});
// Transactions - "DISCARD", "WATCH", "UNWATCH",
// Publish/Subscribe - "SUBSCRIBE", "UNSUBSCRIBE", "PUBLISH",
RedisClient.prototype.multi = function (commands) {
var self = this;
@@ -625,6 +666,9 @@ RedisClient.prototype.multi = function (commands) {
});
});
};
RedisClient.prototype.MULTI = function (commands) {
return this.multi(commands);
};
exports.createClient = function (port_arg, host_arg, options) {
var port = port_arg || default_port,