You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
Improve pub sub mode further
This commit is contained in:
66
index.js
66
index.js
@@ -148,6 +148,7 @@ function RedisClient (options, stream) {
|
|||||||
this.old_state = null;
|
this.old_state = null;
|
||||||
this.fire_strings = true; // Determine if strings or buffers should be written to the stream
|
this.fire_strings = true; // Determine if strings or buffers should be written to the stream
|
||||||
this.pipeline = false;
|
this.pipeline = false;
|
||||||
|
this.sub_commands_left = 0;
|
||||||
this.times_connected = 0;
|
this.times_connected = 0;
|
||||||
this.options = options;
|
this.options = options;
|
||||||
this.buffers = options.return_buffers || options.detect_buffers;
|
this.buffers = options.return_buffers || options.detect_buffers;
|
||||||
@@ -645,6 +646,11 @@ RedisClient.prototype.return_error = function (err) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Count down pub sub mode if in entering modus
|
||||||
|
if (this.pub_sub_mode > 1) {
|
||||||
|
this.pub_sub_mode--;
|
||||||
|
}
|
||||||
|
|
||||||
var match = err.message.match(utils.err_code);
|
var match = err.message.match(utils.err_code);
|
||||||
// LUA script could return user errors that don't behave like all other errors!
|
// LUA script could return user errors that don't behave like all other errors!
|
||||||
if (match) {
|
if (match) {
|
||||||
@@ -677,50 +683,39 @@ function normal_reply (self, reply) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function set_subscribe (self, type, subscribe, channel) {
|
function subscribe_unsubscribe (self, reply, type) {
|
||||||
// Every channel has to be saved / removed one after the other and the type has to be the same too,
|
|
||||||
// to make sure partly subscribe / unsubscribe works well together
|
|
||||||
if (subscribe) {
|
|
||||||
self.subscription_set[type + '_' + channel] = channel;
|
|
||||||
} else {
|
|
||||||
type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent
|
|
||||||
delete self.subscription_set[type + '_' + channel];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function subscribe_unsubscribe (self, reply, type, subscribe) {
|
|
||||||
// Subscribe commands take an optional callback and also emit an event, but only the _last_ response is included in the callback
|
// Subscribe commands take an optional callback and also emit an event, but only the _last_ response is included in the callback
|
||||||
// The pub sub commands return each argument in a separate return value and have to be handled that way
|
// The pub sub commands return each argument in a separate return value and have to be handled that way
|
||||||
var command_obj = self.command_queue.get(0);
|
var command_obj = self.command_queue.get(0);
|
||||||
var buffer = self.options.return_buffers || self.options.detect_buffers && command_obj.buffer_args;
|
var buffer = self.options.return_buffers || self.options.detect_buffers && command_obj.buffer_args;
|
||||||
var channel = (buffer || reply[1] === null) ? reply[1] : reply[1].toString();
|
var channel = (buffer || reply[1] === null) ? reply[1] : reply[1].toString();
|
||||||
var count = +reply[2]; // Return the channel counter as number no matter if `string_numbers` is activated or not
|
var count = +reply[2]; // Return the channel counter as number no matter if `string_numbers` is activated or not
|
||||||
debug('Subscribe / unsubscribe command');
|
debug(type, channel);
|
||||||
|
|
||||||
// Emit first, then return the callback
|
// Emit first, then return the callback
|
||||||
if (channel !== null) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from
|
if (channel !== null) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from
|
||||||
self.emit(type, channel, count);
|
self.emit(type, channel, count);
|
||||||
set_subscribe(self, type, subscribe, channel);
|
if (type === 'subscribe' || type === 'psubscribe') {
|
||||||
}
|
self.subscription_set[type + '_' + channel] = channel;
|
||||||
if (command_obj.sub_commands_left <= 1) {
|
|
||||||
if (count !== 0) {
|
|
||||||
if (!subscribe && command_obj.args.length === 0) { // Unsubscribe from all channels
|
|
||||||
command_obj.sub_commands_left = count;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
|
type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent
|
||||||
|
delete self.subscription_set[type + '_' + channel];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (command_obj.args.length === 1 || self.sub_commands_left === 1 || command_obj.args.length === 0 && (count === 0 || channel === null)) {
|
||||||
|
if (count === 0) { // unsubscribed from all channels
|
||||||
var running_command;
|
var running_command;
|
||||||
var i = 1;
|
var i = 1;
|
||||||
|
self.pub_sub_mode = 0; // Deactivating pub sub mode
|
||||||
// This should be a rare case and therefore handling it this way should be good performance wise for the general case
|
// This should be a rare case and therefore handling it this way should be good performance wise for the general case
|
||||||
while (running_command = self.command_queue.get(i)) {
|
while (running_command = self.command_queue.get(i)) {
|
||||||
if (SUBSCRIBE_COMMANDS[running_command.command]) {
|
if (SUBSCRIBE_COMMANDS[running_command.command]) {
|
||||||
self.command_queue.shift();
|
self.pub_sub_mode = i; // Entering pub sub mode again
|
||||||
self.pub_sub_mode = i;
|
break;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
self.pub_sub_mode = 0;
|
|
||||||
}
|
}
|
||||||
self.command_queue.shift();
|
self.command_queue.shift();
|
||||||
if (typeof command_obj.callback === 'function') {
|
if (typeof command_obj.callback === 'function') {
|
||||||
@@ -728,8 +723,13 @@ function subscribe_unsubscribe (self, reply, type, subscribe) {
|
|||||||
// Evaluate to change this in v.3 to return all subscribed / unsubscribed channels in an array including the number of channels subscribed too
|
// Evaluate to change this in v.3 to return all subscribed / unsubscribed channels in an array including the number of channels subscribed too
|
||||||
command_obj.callback(null, channel);
|
command_obj.callback(null, channel);
|
||||||
}
|
}
|
||||||
|
self.sub_commands_left = 0;
|
||||||
} else {
|
} else {
|
||||||
command_obj.sub_commands_left--;
|
if (self.sub_commands_left !== 0) {
|
||||||
|
self.sub_commands_left--;
|
||||||
|
} else {
|
||||||
|
self.sub_commands_left = command_obj.args.length ? command_obj.args.length - 1 : count;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -751,12 +751,8 @@ function return_pub_sub (self, reply) {
|
|||||||
} else {
|
} else {
|
||||||
self.emit('pmessage', reply[1], reply[2], reply[3]);
|
self.emit('pmessage', reply[1], reply[2], reply[3]);
|
||||||
}
|
}
|
||||||
} else if (type === 'subscribe' || type === 'psubscribe') {
|
|
||||||
subscribe_unsubscribe(self, reply, type, true);
|
|
||||||
} else if (type === 'unsubscribe' || type === 'punsubscribe') {
|
|
||||||
subscribe_unsubscribe(self, reply, type, false);
|
|
||||||
} else {
|
} else {
|
||||||
normal_reply(self, reply);
|
subscribe_unsubscribe(self, reply, type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -787,10 +783,12 @@ RedisClient.prototype.return_reply = function (reply) {
|
|||||||
} else if (this.pub_sub_mode !== 1) {
|
} else if (this.pub_sub_mode !== 1) {
|
||||||
this.pub_sub_mode--;
|
this.pub_sub_mode--;
|
||||||
normal_reply(this, reply);
|
normal_reply(this, reply);
|
||||||
} else if (reply instanceof Array && reply.length > 2 && reply[0]) {
|
} else if (!(reply instanceof Array) || reply.length <= 2) {
|
||||||
return_pub_sub(this, reply);
|
// Only PING and QUIT are allowed in this context besides the pub sub commands
|
||||||
} else {
|
// Ping replies with ['pong', null|value] and quit with 'OK'
|
||||||
normal_reply(this, reply);
|
normal_reply(this, reply);
|
||||||
|
} else {
|
||||||
|
return_pub_sub(this, reply);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -4,10 +4,9 @@
|
|||||||
// a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
|
// a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
|
||||||
function Command (command, args, buffer_args, callback) {
|
function Command (command, args, buffer_args, callback) {
|
||||||
this.command = command;
|
this.command = command;
|
||||||
this.args = args; // We only need the args for the offline commands => move them into another class. We need the number of args though for pub sub
|
this.args = args;
|
||||||
this.buffer_args = buffer_args;
|
this.buffer_args = buffer_args;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
this.sub_commands_left = args.length;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function OfflineCommand (command, args, callback, call_on_write) {
|
function OfflineCommand (command, args, callback, call_on_write) {
|
||||||
|
@@ -79,6 +79,7 @@ describe('publish/subscribe', function () {
|
|||||||
|
|
||||||
it('does not fire subscribe events after reconnecting', function (done) {
|
it('does not fire subscribe events after reconnecting', function (done) {
|
||||||
var i = 0;
|
var i = 0;
|
||||||
|
var end = helper.callFuncAfter(done, 2);
|
||||||
sub.on('subscribe', function (chnl, count) {
|
sub.on('subscribe', function (chnl, count) {
|
||||||
assert.strictEqual(typeof count, 'number');
|
assert.strictEqual(typeof count, 'number');
|
||||||
assert.strictEqual(++i, count);
|
assert.strictEqual(++i, count);
|
||||||
@@ -91,9 +92,10 @@ describe('publish/subscribe', function () {
|
|||||||
sub.unsubscribe(function (err, res) { // Do not pass a channel here!
|
sub.unsubscribe(function (err, res) { // Do not pass a channel here!
|
||||||
assert.strictEqual(sub.pub_sub_mode, 2);
|
assert.strictEqual(sub.pub_sub_mode, 2);
|
||||||
assert.deepEqual(sub.subscription_set, {});
|
assert.deepEqual(sub.subscription_set, {});
|
||||||
|
end();
|
||||||
});
|
});
|
||||||
sub.set('foo', 'bar', helper.isString('OK'));
|
sub.set('foo', 'bar', helper.isString('OK'));
|
||||||
sub.subscribe(channel2, done);
|
sub.subscribe(channel2, end);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -181,25 +183,19 @@ describe('publish/subscribe', function () {
|
|||||||
sub.subscribe('chan9');
|
sub.subscribe('chan9');
|
||||||
sub.unsubscribe('chan9');
|
sub.unsubscribe('chan9');
|
||||||
pub.publish('chan8', 'something');
|
pub.publish('chan8', 'something');
|
||||||
sub.subscribe('chan9', function () {
|
sub.subscribe('chan9', done);
|
||||||
return done();
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('handles SUB_UNSUB_MSG_SUB 2', function (done) {
|
it('handles SUB_UNSUB_MSG_SUB 2', function (done) {
|
||||||
sub.psubscribe('abc*');
|
sub.psubscribe('abc*', helper.isString('abc*'));
|
||||||
sub.subscribe('xyz');
|
sub.subscribe('xyz');
|
||||||
sub.unsubscribe('xyz');
|
sub.unsubscribe('xyz');
|
||||||
pub.publish('abcd', 'something');
|
pub.publish('abcd', 'something');
|
||||||
sub.subscribe('xyz', function () {
|
sub.subscribe('xyz', done);
|
||||||
return done();
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('emits end event if quit is called from within subscribe', function (done) {
|
it('emits end event if quit is called from within subscribe', function (done) {
|
||||||
sub.on('end', function () {
|
sub.on('end', done);
|
||||||
return done();
|
|
||||||
});
|
|
||||||
sub.on('subscribe', function (chnl, count) {
|
sub.on('subscribe', function (chnl, count) {
|
||||||
sub.quit();
|
sub.quit();
|
||||||
});
|
});
|
||||||
@@ -236,6 +232,10 @@ describe('publish/subscribe', function () {
|
|||||||
var end = helper.callFuncAfter(done, 2);
|
var end = helper.callFuncAfter(done, 2);
|
||||||
sub.select(3);
|
sub.select(3);
|
||||||
sub.set('foo', 'bar');
|
sub.set('foo', 'bar');
|
||||||
|
sub.set('failure', helper.isError()); // Triggering a warning while subscribing should work
|
||||||
|
sub.mget('foo', 'bar', 'baz', 'hello', 'world', function (err, res) {
|
||||||
|
assert.deepEqual(res, ['bar', null, null, null, null]);
|
||||||
|
});
|
||||||
sub.subscribe('somechannel', 'another channel', function (err, res) {
|
sub.subscribe('somechannel', 'another channel', function (err, res) {
|
||||||
end();
|
end();
|
||||||
sub.stream.destroy();
|
sub.stream.destroy();
|
||||||
@@ -280,7 +280,7 @@ describe('publish/subscribe', function () {
|
|||||||
|
|
||||||
it('should only resubscribe to channels not unsubscribed earlier on a reconnect', function (done) {
|
it('should only resubscribe to channels not unsubscribed earlier on a reconnect', function (done) {
|
||||||
sub.subscribe('/foo', '/bar');
|
sub.subscribe('/foo', '/bar');
|
||||||
sub.unsubscribe('/bar', function () {
|
sub.batch().unsubscribe(['/bar'], function () {
|
||||||
pub.pubsub('channels', function (err, res) {
|
pub.pubsub('channels', function (err, res) {
|
||||||
assert.deepEqual(res, ['/foo']);
|
assert.deepEqual(res, ['/foo']);
|
||||||
sub.stream.destroy();
|
sub.stream.destroy();
|
||||||
@@ -291,7 +291,7 @@ describe('publish/subscribe', function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
}).exec();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('unsubscribes, subscribes, unsubscribes... single and multiple entries mixed. Withouth callbacks', function (done) {
|
it('unsubscribes, subscribes, unsubscribes... single and multiple entries mixed. Withouth callbacks', function (done) {
|
||||||
@@ -490,7 +490,7 @@ describe('publish/subscribe', function () {
|
|||||||
return_buffers: true
|
return_buffers: true
|
||||||
});
|
});
|
||||||
sub2.on('ready', function () {
|
sub2.on('ready', function () {
|
||||||
sub2.psubscribe('*');
|
sub2.batch().psubscribe('*', helper.isString('*')).exec();
|
||||||
sub2.subscribe('/foo');
|
sub2.subscribe('/foo');
|
||||||
sub2.on('pmessage', function (pattern, channel, message) {
|
sub2.on('pmessage', function (pattern, channel, message) {
|
||||||
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
|
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
|
||||||
@@ -501,32 +501,58 @@ describe('publish/subscribe', function () {
|
|||||||
pub.pubsub('numsub', '/foo', function (err, res) {
|
pub.pubsub('numsub', '/foo', function (err, res) {
|
||||||
assert.deepEqual(res, ['/foo', 2]);
|
assert.deepEqual(res, ['/foo', 2]);
|
||||||
});
|
});
|
||||||
|
// sub2 is counted twice as it subscribed with psubscribe and subscribe
|
||||||
pub.publish('/foo', 'hello world', helper.isNumber(3));
|
pub.publish('/foo', 'hello world', helper.isNumber(3));
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('allows to listen to pmessageBuffer and pmessage', function (done) {
|
it('allows to listen to pmessageBuffer and pmessage', function (done) {
|
||||||
var batch = sub.batch();
|
var batch = sub.batch();
|
||||||
|
var end = helper.callFuncAfter(done, 6);
|
||||||
|
assert.strictEqual(sub.message_buffers, false);
|
||||||
batch.psubscribe('*');
|
batch.psubscribe('*');
|
||||||
batch.subscribe('/foo');
|
batch.subscribe('/foo');
|
||||||
batch.unsubscribe('/foo');
|
batch.unsubscribe('/foo');
|
||||||
batch.unsubscribe();
|
batch.unsubscribe(helper.isNull());
|
||||||
batch.subscribe(['/foo']);
|
batch.subscribe(['/foo'], helper.isString('/foo'));
|
||||||
batch.exec();
|
batch.exec();
|
||||||
assert.strictEqual(sub.shouldBuffer, false);
|
assert.strictEqual(sub.shouldBuffer, false);
|
||||||
sub.on('pmessageBuffer', function (pattern, channel, message) {
|
sub.on('pmessageBuffer', function (pattern, channel, message) {
|
||||||
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
|
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
|
||||||
assert.strictEqual(channel.inspect(), new Buffer('/foo').inspect());
|
assert.strictEqual(channel.inspect(), new Buffer('/foo').inspect());
|
||||||
sub.quit(done);
|
sub.quit(end);
|
||||||
});
|
});
|
||||||
|
// Either message_buffers or buffers has to be true, but not both at the same time
|
||||||
|
assert.notStrictEqual(sub.message_buffers, sub.buffers);
|
||||||
sub.on('pmessage', function (pattern, channel, message) {
|
sub.on('pmessage', function (pattern, channel, message) {
|
||||||
assert.strictEqual(pattern, '*');
|
assert.strictEqual(pattern, '*');
|
||||||
assert.strictEqual(channel, '/foo');
|
assert.strictEqual(channel, '/foo');
|
||||||
|
assert.strictEqual(message, 'hello world');
|
||||||
|
end();
|
||||||
});
|
});
|
||||||
|
sub.on('message', function (channel, message) {
|
||||||
|
assert.strictEqual(channel, '/foo');
|
||||||
|
assert.strictEqual(message, 'hello world');
|
||||||
|
end();
|
||||||
|
});
|
||||||
|
setTimeout(function () {
|
||||||
pub.pubsub('numsub', '/foo', function (err, res) {
|
pub.pubsub('numsub', '/foo', function (err, res) {
|
||||||
|
// There's one subscriber to this channel
|
||||||
assert.deepEqual(res, ['/foo', 1]);
|
assert.deepEqual(res, ['/foo', 1]);
|
||||||
|
end();
|
||||||
|
});
|
||||||
|
pub.pubsub('channels', function (err, res) {
|
||||||
|
// There's exactly one channel that is listened too
|
||||||
|
assert.deepEqual(res, ['/foo']);
|
||||||
|
end();
|
||||||
|
});
|
||||||
|
pub.pubsub('numpat', function (err, res) {
|
||||||
|
// One pattern is active
|
||||||
|
assert.strictEqual(res, 1);
|
||||||
|
end();
|
||||||
});
|
});
|
||||||
pub.publish('/foo', 'hello world', helper.isNumber(2));
|
pub.publish('/foo', 'hello world', helper.isNumber(2));
|
||||||
|
}, 50);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -536,10 +562,7 @@ describe('publish/subscribe', function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('executes callback when punsubscribe is called and there are no subscriptions', function (done) {
|
it('executes callback when punsubscribe is called and there are no subscriptions', function (done) {
|
||||||
pub.punsubscribe(function (err, results) {
|
pub.batch().punsubscribe(helper.isNull()).exec(done);
|
||||||
assert.strictEqual(null, results);
|
|
||||||
done(err);
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user