1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

feat: return channel number and channels from subscribe / unsubscribe calls

This commit is contained in:
Ruben Bridgewater
2017-05-06 00:27:44 +02:00
parent a6053c510a
commit 27ed4db537
5 changed files with 51 additions and 27 deletions

View File

@@ -112,6 +112,7 @@ function RedisClient (options, stream) {
this.buffers = options.returnBuffers || options.detectBuffers; this.buffers = options.returnBuffers || options.detectBuffers;
this.options = options; this.options = options;
this.reply = 'ON'; // Returning replies is the default this.reply = 'ON'; // Returning replies is the default
this.subscribeChannels = [];
// Init parser // Init parser
this.replyParser = createParser(this); this.replyParser = createParser(this);
this.createStream(); this.createStream();
@@ -605,6 +606,7 @@ function subscribeUnsubscribe (self, reply, type) {
type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent
delete self.subscriptionSet[type + '_' + channel]; delete self.subscriptionSet[type + '_' + channel];
} }
self.subscribeChannels.push(channel);
} }
if (commandObj.args.length === 1 || self.subCommandsLeft === 1 || commandObj.args.length === 0 && (count === 0 || channel === null)) { if (commandObj.args.length === 1 || self.subCommandsLeft === 1 || commandObj.args.length === 0 && (count === 0 || channel === null)) {
@@ -623,10 +625,9 @@ function subscribeUnsubscribe (self, reply, type) {
} }
self.commandQueue.shift(); self.commandQueue.shift();
if (typeof commandObj.callback === 'function') { if (typeof commandObj.callback === 'function') {
// TODO: The current return value is pretty useless. commandObj.callback(null, [count, self.subscribeChannels]);
// Evaluate to change this in v.3 to return all subscribed / unsubscribed channels in an array including the number of channels subscribed too
commandObj.callback(null, channel);
} }
self.subscribeChannels = [];
self.subCommandsLeft = 0; self.subCommandsLeft = 0;
} else { } else {
if (self.subCommandsLeft !== 0) { if (self.subCommandsLeft !== 0) {

View File

@@ -321,14 +321,17 @@ describe('client authentication', function () {
assert.strictEqual(client.serverInfo.sync_full, '0'); assert.strictEqual(client.serverInfo.sync_full, '0');
}) })
.get('foo', helper.isString('bar')) .get('foo', helper.isString('bar'))
.subscribe(['foo', 'bar']) .subscribe(['foo', 'bar', 'foo'], helper.isUnSubscribe(2, ['foo', 'bar', 'foo']))
.unsubscribe('foo') .unsubscribe('foo')
.SUBSCRIBE('/foo', helper.isString('/foo')) .SUBSCRIBE('/foo', helper.isUnSubscribe(2, '/foo'))
.psubscribe('*') .psubscribe('*')
.quit(helper.isString('OK')) // this might be interesting .quit(helper.isString('OK'))
.exec(function (err, res) { .exec(function (err, res) {
res[4] = res[4].substr(0, 9); res[4] = res[4].substr(0, 9);
assert.deepEqual(res, ['OK', 'OK', 'OK', 'OK', '# Stats\r\n', 'bar', 'bar', 'foo', '/foo', '*', 'OK']); assert.deepStrictEqual(
res,
['OK', 'OK', 'OK', 'OK', '# Stats\r\n', 'bar', [2, ['foo', 'bar', 'foo']], [1, ['foo']], [2, ['/foo']], [3, ['*']], 'OK']
);
end(); end();
}); });
}); });

View File

@@ -309,6 +309,7 @@ describe('connection tests', function () {
assert.strictEqual(client.stream.listeners('timeout').length, 0); assert.strictEqual(client.stream.listeners('timeout').length, 0);
done(); done();
}); });
client.end(true);
}); });
it('clears the socket timeout after a connection has been established', function (done) { it('clears the socket timeout after a connection has been established', function (done) {

View File

@@ -130,6 +130,31 @@ module.exports = {
}; };
} }
}, },
isUnSubscribe: function (count, channels, done) {
if (typeof count !== 'number') {
done = channels;
channels = count;
count = undefined;
}
if (typeof channels === 'function') {
done = count;
count = undefined;
}
if (typeof channels === 'string') {
channels = [channels];
}
var len = channels.length;
return function (err, results) {
assert.strictEqual(err, null, 'expected an array, got: ' + err);
assert.strictEqual(Array.isArray(results), true, results);
assert.strictEqual(Array.isArray(results[1]), true, results);
assert.strictEqual(results[1].length, len, results);
assert.strictEqual(typeof results[0], 'number', results);
if (count) assert.strictEqual(count, results[0], results);
if (done) done();
};
},
match: function (pattern, done) { match: function (pattern, done) {
return function (err, results) { return function (err, results) {
assert.strictEqual(err, null, 'expected ' + pattern.toString() + ', got error: ' + err); assert.strictEqual(err, null, 'expected ' + pattern.toString() + ', got error: ' + err);

View File

@@ -187,7 +187,7 @@ describe('publish/subscribe', function () {
}); });
it('handles SUB UNSUB MSG SUB 2', function (done) { it('handles SUB UNSUB MSG SUB 2', function (done) {
sub.psubscribe('abc*', helper.isString('abc*')); sub.psubscribe('abc*', helper.isUnSubscribe(1, 'abc*'));
sub.subscribe('xyz'); sub.subscribe('xyz');
sub.unsubscribe('xyz'); sub.unsubscribe('xyz');
pub.publish('abcd', 'something'); pub.publish('abcd', 'something');
@@ -256,7 +256,7 @@ describe('publish/subscribe', function () {
}); });
sub.set('foo', 'bar'); sub.set('foo', 'bar');
sub.unsubscribe(function (err, res) { sub.unsubscribe(function (err, res) {
assert.strictEqual(res, null); assert.deepStrictEqual(res, [0, []]);
}); });
sub.del('foo', done); sub.del('foo', done);
}); });
@@ -427,7 +427,7 @@ describe('publish/subscribe', function () {
}); });
sub.punsubscribe(function (err, res) { sub.punsubscribe(function (err, res) {
assert(!err); assert(!err);
assert.strictEqual(res, 'bla'); assert.deepStrictEqual(res, [0, ['prefix:3', 'prefix:2', '5', 'test:a', 'bla']]);
assert(all); assert(all);
all = false; // Make sure the callback is actually after the emit all = false; // Make sure the callback is actually after the emit
end(); end();
@@ -468,19 +468,13 @@ describe('publish/subscribe', function () {
}); });
}); });
it('does not complain when unsubscribe is called and there are no subscriptions', function (done) { it('sub executes callback when unsubscribe is called and there are no subscriptions', function (done) {
sub.unsubscribe(function (err, res) { sub.unsubscribe(helper.isUnSubscribe(0, [], done));
assert.strictEqual(err, null);
assert.strictEqual(res, null);
done();
});
}); });
it('executes callback when unsubscribe is called and there are no subscriptions', function (done) { it('pub executes callback when unsubscribe is called and there are no subscriptions', function (done) {
pub.unsubscribe(function (err, results) { pub.unsubscribe(helper.isUnSubscribe(0, []));
assert.strictEqual(null, results); pub.get('foo', done);
done(err);
});
}); });
}); });
@@ -491,7 +485,7 @@ describe('publish/subscribe', function () {
}); });
sub.subscribe('/foo', function () { sub.subscribe('/foo', function () {
sub2.on('ready', function () { sub2.on('ready', function () {
sub2.batch().psubscribe('*', helper.isString('*')).exec(); sub2.batch().psubscribe('*', helper.isUnSubscribe(1, '*')).exec();
sub2.subscribe('/foo'); sub2.subscribe('/foo');
sub2.on('pmessage', function (pattern, channel, message) { sub2.on('pmessage', function (pattern, channel, message) {
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect()); assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
@@ -500,7 +494,7 @@ describe('publish/subscribe', function () {
sub2.quit(done); sub2.quit(done);
}); });
pub.pubsub('numsub', '/foo', function (err, res) { pub.pubsub('numsub', '/foo', function (err, res) {
assert.deepEqual(res, ['/foo', 2]); assert.deepStrictEqual(res, ['/foo', 2]);
}); });
// sub2 is counted twice as it subscribed with psubscribe and subscribe // sub2 is counted twice as it subscribed with psubscribe and subscribe
pub.publish('/foo', 'hello world', helper.isNumber(3)); pub.publish('/foo', 'hello world', helper.isNumber(3));
@@ -527,8 +521,8 @@ describe('publish/subscribe', function () {
batch.psubscribe('*'); batch.psubscribe('*');
batch.subscribe('/foo'); batch.subscribe('/foo');
batch.unsubscribe('/foo'); batch.unsubscribe('/foo');
batch.unsubscribe(helper.isNull()); batch.unsubscribe(helper.isUnSubscribe(1, []));
batch.subscribe(['/foo'], helper.isString('/foo')); batch.subscribe(['/foo'], helper.isUnSubscribe(2, '/foo'));
batch.exec(function () { batch.exec(function () {
pub.pubsub('numsub', '/foo', function (err, res) { pub.pubsub('numsub', '/foo', function (err, res) {
// There's one subscriber to this channel // There's one subscriber to this channel
@@ -569,7 +563,7 @@ describe('publish/subscribe', function () {
}); });
it('executes callback when punsubscribe is called and there are no subscriptions', function (done) { it('executes callback when punsubscribe is called and there are no subscriptions', function (done) {
pub.batch().punsubscribe(helper.isNull()).exec(done); pub.batch().punsubscribe(helper.isUnSubscribe(0, [])).exec(done);
}); });
}); });
@@ -654,7 +648,7 @@ describe('publish/subscribe', function () {
.client('KILL', ['type', 'pubsub'], function () {}) .client('KILL', ['type', 'pubsub'], function () {})
.unsubscribe() .unsubscribe()
.psubscribe(['pattern:*']) .psubscribe(['pattern:*'])
.punsubscribe('unkown*') .punsubscribe('unknown*')
.punsubscribe(['pattern:*']) .punsubscribe(['pattern:*'])
.exec(function (err, res) { .exec(function (err, res) {
sub.client('kill', ['type', 'pubsub']); sub.client('kill', ['type', 'pubsub']);