1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Fix pub sub mode

There is likely a better and more performant way to fix this but this works so far
and should be good enough to release and improve later.

Make test more robust

Add another test
This commit is contained in:
Ruben Bridgewater
2016-03-25 23:15:44 +01:00
parent 5294917280
commit 7a5a4aa535
9 changed files with 520 additions and 170 deletions

244
index.js
View File

@@ -5,7 +5,8 @@ var tls = require('tls');
var util = require('util');
var utils = require('./lib/utils');
var Queue = require('double-ended-queue');
var Command = require('./lib/command');
var Command = require('./lib/command').Command;
var OfflineCommand = require('./lib/command').OfflineCommand;
var EventEmitter = require('events');
var Parser = require('redis-parser');
var commands = require('redis-commands');
@@ -128,7 +129,7 @@ function RedisClient (options, stream) {
);
}
this.initialize_retry_vars();
this.pub_sub_mode = false;
this.pub_sub_mode = 0;
this.subscription_set = {};
this.monitoring = false;
this.closing = false;
@@ -222,6 +223,7 @@ RedisClient.prototype.create_stream = function () {
// The buffer_from_socket.toString() has a significant impact on big chunks and therefor this should only be used if necessary
debug('Net read ' + self.address + ' id ' + self.connection_id); // + ': ' + buffer_from_socket.toString());
self.reply_parser.execute(buffer_from_socket);
self.emit_idle();
});
this.stream.on('error', function (err) {
@@ -386,7 +388,7 @@ RedisClient.prototype.on_ready = function () {
}
this.cork = cork;
// restore modal commands from previous connection. The order of the commands is important
// Restore modal commands from previous connection. The order of the commands is important
if (this.selected_db !== undefined) {
this.send_command('select', [this.selected_db]);
}
@@ -394,31 +396,29 @@ RedisClient.prototype.on_ready = function () {
this.monitoring = this.old_state.monitoring;
this.pub_sub_mode = this.old_state.pub_sub_mode;
}
if (this.pub_sub_mode) {
if (this.monitoring) { // Monitor has to be fired before pub sub commands
this.send_command('monitor', []);
}
var callback_count = Object.keys(this.subscription_set).length;
if (!this.options.disable_resubscribing && callback_count) {
// only emit 'ready' when all subscriptions were made again
var callback_count = 0;
// TODO: Remove the countdown for ready here. This is not coherent with all other modes and should therefor not be handled special
// We know we are ready as soon as all commands were fired
var callback = function () {
callback_count--;
if (callback_count === 0) {
self.emit('ready');
}
};
if (this.options.disable_resubscribing) {
this.emit('ready');
return;
debug('Sending pub/sub on_ready commands');
for (var key in this.subscription_set) { // jshint ignore: line
var command = key.slice(0, key.indexOf('_'));
var args = self.subscription_set[key];
self.send_command(command, [args], callback);
}
Object.keys(this.subscription_set).forEach(function (key) {
var space_index = key.indexOf(' ');
var parts = [key.slice(0, space_index), key.slice(space_index + 1)];
debug('Sending pub/sub on_ready ' + parts[0] + ', ' + parts[1]);
callback_count++;
self.send_command(parts[0] + 'scribe', [parts[1]], callback);
});
this.send_offline_queue();
return;
}
if (this.monitoring) {
this.send_command('monitor', []);
}
this.send_offline_queue();
this.emit('ready');
};
@@ -521,7 +521,7 @@ RedisClient.prototype.connection_gone = function (why, error) {
};
this.old_state = state;
this.monitoring = false;
this.pub_sub_mode = false;
this.pub_sub_mode = 0;
// since we are collapsing end and close, users don't expect to be called twice
if (!this.emitted_end) {
@@ -603,7 +603,6 @@ RedisClient.prototype.return_error = function (err) {
err.code = match[1];
}
this.emit_idle();
utils.callback_or_emit(this, command_obj && command_obj.callback, err);
};
@@ -613,19 +612,13 @@ RedisClient.prototype.drain = function () {
};
RedisClient.prototype.emit_idle = function () {
if (this.command_queue.length === 0 && this.pub_sub_mode === false) {
if (this.command_queue.length === 0 && this.pub_sub_mode === 0) {
this.emit('idle');
}
};
/* istanbul ignore next: this is a safety check that we should not be able to trigger */
function queue_state_error (self, command_obj) {
var err = new Error('node_redis command queue state error. If you can reproduce this, please report it.');
err.command_obj = command_obj;
self.emit('error', err);
}
function normal_reply (self, reply, command_obj) {
function normal_reply (self, reply) {
var command_obj = self.command_queue.shift();
if (typeof command_obj.callback === 'function') {
if ('exec' !== command_obj.command) {
reply = self.handle_reply(reply, command_obj.command, command_obj.buffer_args);
@@ -636,67 +629,107 @@ function normal_reply (self, reply, command_obj) {
}
}
function return_pub_sub (self, reply, command_obj) {
if (reply instanceof Array) {
if ((!command_obj || command_obj.buffer_args === false) && !self.options.return_buffers) {
reply = utils.reply_to_strings(reply);
function set_subscribe (self, type, command_obj, subscribe, reply) {
var i = 0;
if (subscribe) {
// The channels have to be saved one after the other and the type has to be the same too,
// to make sure partly subscribe / unsubscribe works well together
for (; i < command_obj.args.length; i++) {
self.subscription_set[type + '_' + command_obj.args[i]] = command_obj.args[i];
}
var type = reply[0].toString();
} else {
type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent
for (; i < command_obj.args.length; i++) {
delete self.subscription_set[type + '_' + command_obj.args[i]];
}
if (reply[2] === 0) { // No channels left that this client is subscribed to
var running_command;
i = 0;
// This should be a rare case and therefor handling it this way should be good performance wise for the general case
while (running_command = self.command_queue.get(i++)) {
if (
running_command.command === 'subscribe' ||
running_command.command === 'psubscribe' ||
running_command.command === 'unsubscribe' ||
running_command.command === 'punsubscribe'
) {
self.pub_sub_mode = i;
return;
}
}
self.pub_sub_mode = 0;
}
}
}
// TODO: Add buffer emiters (we have to get all pubsub messages as buffers back in that case)
if (type === 'message') {
self.emit('message', reply[1], reply[2]); // channcel, message
} else if (type === 'pmessage') {
self.emit('pmessage', reply[1], reply[2], reply[3]); // pattern, channcel, message
} else if (type === 'subscribe' || type === 'unsubscribe' || type === 'psubscribe' || type === 'punsubscribe') {
if (reply[2].toString() === '0') {
self.pub_sub_mode = false;
debug('All subscriptions removed, exiting pub/sub mode');
} else {
self.pub_sub_mode = true;
}
// Subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback
// TODO - document this or fix it so it works in a more obvious way
if (command_obj && typeof command_obj.callback === 'function') {
command_obj.callback(null, reply[1]);
}
self.emit(type, reply[1], reply[2]); // channcel, count
} else {
self.emit('error', new Error('subscriptions are active but got unknown reply type ' + type));
function subscribe_unsubscribe (self, reply, type, subscribe) {
// Subscribe commands take an optional callback and also emit an event, but only the _last_ response is included in the callback
var command_obj = self.command_queue.get(0);
var buffer = self.options.return_buffers || self.options.detect_buffers && command_obj && command_obj.buffer_args || reply[1] === null;
var channel = buffer ? reply[1] : reply[1].toString();
var count = reply[2];
debug('Subscribe / unsubscribe command');
// Emit first, then return the callback
if (channel !== null) { // Do not emit something if there was no channel to unsubscribe from
self.emit(type, channel, count);
}
// The pub sub commands return each argument in a separate return value and have to be handled that way
if (command_obj.sub_commands_left <= 1) {
if (count !== 0 && !subscribe && command_obj.args.length === 0) {
command_obj.sub_commands_left = count;
return;
}
} else if (!self.closing) {
self.emit('error', new Error('subscriptions are active but got an invalid reply: ' + reply));
self.command_queue.shift();
set_subscribe(self, type, command_obj, subscribe, reply);
if (typeof command_obj.callback === 'function') {
// TODO: The current return value is pretty useless.
// Evaluate to change this in v.3 to return all subscribed / unsubscribed channels in an array including the number of channels subscribed too
command_obj.callback(null, channel);
}
} else {
command_obj.sub_commands_left--;
}
}
function return_pub_sub (self, reply) {
var type = reply[0].toString();
if (type === 'message') { // channel, message
// TODO: Implement message_buffer
// if (self.buffers) {
// self.emit('message_buffer', reply[1], reply[2]);
// }
if (!self.options.return_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
self.emit('message', reply[1].toString(), reply[2].toString());
} else {
self.emit('message', reply[1], reply[2]);
}
} else if (type === 'pmessage') { // pattern, channel, message
// if (self.buffers) {
// self.emit('pmessage_buffer', reply[1], reply[2], reply[3]);
// }
if (!self.options.return_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
self.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3].toString());
} else {
self.emit('pmessage', reply[1], reply[2], reply[3]);
}
} else if (type === 'subscribe' || type === 'psubscribe') {
subscribe_unsubscribe(self, reply, type, true);
} else if (type === 'unsubscribe' || type === 'punsubscribe') {
subscribe_unsubscribe(self, reply, type, false);
} else {
normal_reply(self, reply);
}
}
RedisClient.prototype.return_reply = function (reply) {
var command_obj, type, queue_len;
// If the 'reply' here is actually a message received asynchronously due to a
// pubsub subscription, don't pop the command queue as we'll only be consuming
// the head command prematurely.
if (this.pub_sub_mode && reply instanceof Array && reply[0]) {
type = reply[0].toString();
}
if (this.pub_sub_mode && (type === 'message' || type === 'pmessage')) {
debug('Received pubsub message');
if (this.pub_sub_mode === 1 && reply instanceof Array && reply.length !== 0 && reply[0]) {
return_pub_sub(this, reply);
} else {
command_obj = this.command_queue.shift();
}
queue_len = this.command_queue.length;
this.emit_idle();
if (command_obj && !command_obj.sub_command) {
normal_reply(this, reply, command_obj);
} else if (this.pub_sub_mode || command_obj && command_obj.sub_command) {
return_pub_sub(this, reply, command_obj);
}
/* istanbul ignore else: this is a safety check that we should not be able to trigger */
else if (!this.monitoring) {
queue_state_error(this, command_obj);
if (this.pub_sub_mode !== 0 && this.pub_sub_mode !== 1) {
this.pub_sub_mode--;
}
normal_reply(this, reply);
}
};
@@ -731,16 +764,15 @@ RedisClient.prototype.send_command = function (command, args, callback) {
var command_str = '';
var len = 0;
var big_data = false;
var buffer_args = false;
if (process.domain && callback) {
callback = process.domain.bind(callback);
}
var command_obj = new Command(command, args, callback);
if (this.ready === false || this.stream.writable === false) {
// Handle offline commands right away
handle_offline_command(this, command_obj);
handle_offline_command(this, new OfflineCommand(command, args, callback));
return false; // Indicate buffering
}
@@ -776,7 +808,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
args_copy[i] = 'null'; // Backwards compatible :/
} else if (Buffer.isBuffer(args[i])) {
args_copy[i] = args[i];
command_obj.buffer_args = true;
buffer_args = true;
big_data = true;
if (this.pipeline !== 0) {
this.pipeline += 2;
@@ -803,9 +835,15 @@ RedisClient.prototype.send_command = function (command, args, callback) {
}
}
args = null;
var command_obj = new Command(command, args_copy, callback);
command_obj.buffer_args = buffer_args;
if (command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe') {
this.pub_sub_command(command_obj); // TODO: This has to be moved to the result handler
// If pub sub is already activated, keep it that way, otherwise set the number of commands to resolve until pub sub mode activates
// Deactivation of the pub sub mode happens in the result handler
if (!this.pub_sub_mode) {
this.pub_sub_mode = this.command_queue.length + 1;
}
} else if (command === 'quit') {
this.closing = true;
}
@@ -886,38 +924,6 @@ RedisClient.prototype.write = function (data) {
return;
};
RedisClient.prototype.pub_sub_command = function (command_obj) {
var i, key, command, args;
if (this.pub_sub_mode === false) {
debug('Entering pub/sub mode from ' + command_obj.command);
}
this.pub_sub_mode = true;
command_obj.sub_command = true;
command = command_obj.command;
args = command_obj.args;
if (command === 'subscribe' || command === 'psubscribe') {
if (command === 'subscribe') {
key = 'sub';
} else {
key = 'psub';
}
for (i = 0; i < args.length; i++) {
this.subscription_set[key + ' ' + args[i]] = true;
}
} else {
if (command === 'unsubscribe') {
key = 'sub';
} else {
key = 'psub';
}
for (i = 0; i < args.length; i++) {
delete this.subscription_set[key + ' ' + args[i]];
}
}
};
RedisClient.prototype.end = function (flush) {
// Flush queue if wanted
if (flush) {