1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Move pub sub command into individual commands and use call_on_write

This commit is contained in:
Ruben Bridgewater
2016-04-14 01:11:20 +02:00
parent 683815de9d
commit 0424cb0bf3
5 changed files with 280 additions and 79 deletions

View File

@@ -484,7 +484,7 @@ RedisClient.prototype.ready_check = function () {
RedisClient.prototype.send_offline_queue = function () {
for (var command_obj = this.offline_queue.shift(); command_obj; command_obj = this.offline_queue.shift()) {
debug('Sending offline command: ' + command_obj.command);
this.internal_send_command(command_obj.command, command_obj.args, command_obj.callback);
this.internal_send_command(command_obj.command, command_obj.args, command_obj.callback, command_obj.call_on_write);
}
this.drain();
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
@@ -771,8 +771,10 @@ function handle_offline_command (self, command_obj) {
self.should_buffer = true;
}
RedisClient.prototype.internal_send_command = function (command, args, callback) {
var arg, prefix_keys;
// Do not call internal_send_command directly, if you are not absolutly certain it handles everything properly
// e.g. monitor / info does not work with internal_send_command only
RedisClient.prototype.internal_send_command = function (command, args, callback, call_on_write) {
var arg, prefix_keys, command_obj;
var i = 0;
var command_str = '';
var len = args.length;
@@ -786,7 +788,7 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
if (this.ready === false || this.stream.writable === false) {
// Handle offline commands right away
handle_offline_command(this, new OfflineCommand(command, args, callback));
handle_offline_command(this, new OfflineCommand(command, args, callback, call_on_write));
return false; // Indicate buffering
}
@@ -834,15 +836,7 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
}
}
args = null;
var command_obj = new Command(command, args_copy, callback);
command_obj.buffer_args = buffer_args;
if (SUBSCRIBE_COMMANDS[command] && this.pub_sub_mode === 0) {
// If pub sub is already activated, keep it that way, otherwise set the number of commands to resolve until pub sub mode activates
// Deactivation of the pub sub mode happens in the result handler
this.pub_sub_mode = this.command_queue.length + 1;
}
this.command_queue.push(command_obj);
command_obj = new Command(command, args_copy, buffer_args, callback);
if (this.options.prefix) {
prefix_keys = commands.getKeyIndexes(command, args_copy);
@@ -881,6 +875,9 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
debug('send_command: buffer send ' + arg.length + ' bytes');
}
}
if (call_on_write) {
call_on_write();
}
return !this.should_buffer;
};

View File

@@ -2,18 +2,19 @@
// This Command constructor is ever so slightly faster than using an object literal, but more importantly, using
// a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
function Command (command, args, callback) {
function Command (command, args, buffer_args, callback) {
this.command = command;
this.args = args; // We only need the args for the offline commands => move them into another class. We need the number of args though for pub sub
this.buffer_args = false;
this.buffer_args = buffer_args;
this.callback = callback;
this.sub_commands_left = args.length;
}
function OfflineCommand (command, args, callback) {
function OfflineCommand (command, args, callback, call_on_write) {
this.command = command;
this.args = args;
this.callback = callback;
this.call_on_write = call_on_write;
}
module.exports = {

View File

@@ -7,9 +7,18 @@ var no_password_is_set = /no password is set/;
var loading = /LOADING/;
var RedisClient = require('../').RedisClient;
/********************************
Replace built-in redis functions
********************************/
/********************************************************************************************
Replace built-in redis functions
The callback may be hooked as needed. The same does not apply to the rest of the function.
State should not be set outside of the callback if not absolutly necessary.
This is important to make sure it works the same as single command or in a multi context.
To make sure everything works with the offline queue use the "call_on_write" function.
This is going to be executed while writing to the stream.
TODO: Implement individal command generation as soon as possible to prevent divergent code
on single and multi calls!
********************************************************************************************/
RedisClient.prototype.multi = RedisClient.prototype.MULTI = function multi (args) {
var multi = new Multi(this, args);
@@ -209,3 +218,227 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
}
return this.internal_send_command('hmset', arr, callback);
};
RedisClient.prototype.subscribe = RedisClient.prototype.SUBSCRIBE = function subscribe () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
var self = this;
var call_on_write = function () {
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
};
return this.internal_send_command('subscribe', arr, callback, call_on_write);
};
Multi.prototype.subscribe = Multi.prototype.SUBSCRIBE = function subscribe () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
var self = this._client;
var call_on_write = function () {
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
};
this.queue.push(['subscribe', arr, callback, call_on_write]);
return this;
};
RedisClient.prototype.unsubscribe = RedisClient.prototype.UNSUBSCRIBE = function unsubscribe () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
var self = this;
var call_on_write = function () {
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
};
return this.internal_send_command('unsubscribe', arr, callback, call_on_write);
};
Multi.prototype.unsubscribe = Multi.prototype.UNSUBSCRIBE = function unsubscribe () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
var self = this._client;
var call_on_write = function () {
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
};
this.queue.push(['unsubscribe', arr, callback, call_on_write]);
return this;
};
RedisClient.prototype.psubscribe = RedisClient.prototype.PSUBSCRIBE = function psubscribe () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
var self = this;
var call_on_write = function () {
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
};
return this.internal_send_command('psubscribe', arr, callback, call_on_write);
};
Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
var self = this;
var call_on_write = function () {
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
};
this.queue.push(['psubscribe', arr, callback, call_on_write]);
return this;
};
RedisClient.prototype.punsubscribe = RedisClient.prototype.PUNSUBSCRIBE = function punsubscribe () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
var self = this;
var call_on_write = function () {
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
};
return this.internal_send_command('punsubscribe', arr, callback, call_on_write);
};
Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscribe () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
var self = this;
var call_on_write = function () {
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
};
this.queue.push(['punsubscribe', arr, callback, call_on_write]);
return this;
};

View File

@@ -20,45 +20,8 @@ function Multi (client, args) {
}
}
Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () {
var arr,
len = 0,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else if (Array.isArray(arguments[1])) {
len = arguments[1].length;
arr = new Array(len + 1);
arr[0] = arguments[0];
for (; i < len; i += 1) {
arr[i + 1] = arguments[1][i];
}
callback = arguments[2];
} else if (typeof arguments[1] === 'object' && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) {
arr = [arguments[0]];
for (var field in arguments[1]) { // jshint ignore: line
arr.push(field, arguments[1][field]);
}
callback = arguments[2];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
this.queue.push(['hmset', arr, callback]);
return this;
};
function pipeline_transaction_command (self, command, args, index, cb) {
function pipeline_transaction_command (self, command, args, index, cb, call_on_write) {
// Queueing is done first, then the commands are executed
self._client.send_command(command, args, function (err, reply) {
if (err) {
if (cb) {
@@ -131,20 +94,22 @@ Multi.prototype.exec_transaction = function exec_transaction (callback) {
pipeline_transaction_command(self, 'multi', []);
// Drain queue, callback will catch 'QUEUED' or error
for (var index = 0; index < len; index++) {
var args = self.queue.get(index);
var command = args[0];
var cb = args[2];
// The commands may not be shifted off, since they are needed in the result handler
var command_obj = self.queue.get(index);
var command = command_obj[0];
var cb = command_obj[2];
var call_on_write = command_obj.length === 4 ? command_obj[3] : undefined;
// Keep track of who wants buffer responses:
if (self._client.options.detect_buffers) {
self.wants_buffers[index] = false;
for (var i = 0; i < args[1].length; i += 1) {
if (args[1][i] instanceof Buffer) {
for (var i = 0; i < command_obj[1].length; i += 1) {
if (command_obj[1][i] instanceof Buffer) {
self.wants_buffers[index] = true;
break;
}
}
}
pipeline_transaction_command(self, command, args[1], index, cb);
pipeline_transaction_command(self, command, command_obj[1], index, cb, call_on_write);
}
self._client.internal_send_command('exec', [], function (err, replies) {
@@ -171,7 +136,18 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
var self = this;
var len = self.queue.length;
var index = 0;
var args;
var command_obj;
self._client.cork();
if (!callback) {
while (command_obj = self.queue.shift()) {
self._client.internal_send_command(command_obj[0], command_obj[1], command_obj[2], (command_obj.length === 4 ? command_obj[3] : undefined));
}
self._client.uncork();
return !self._client.should_buffer;
} else if (len === 0) {
utils.reply_in_order(self._client, callback, null, []);
return !self._client.should_buffer;
}
var callback_without_own_cb = function (err, res) {
if (err) {
self.results.push(err);
@@ -190,26 +166,20 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
callback(null, self.results);
};
};
if (len === 0) {
if (callback) {
utils.reply_in_order(self._client, callback, null, []);
}
return true;
}
self.results = [];
self._client.cork();
while (args = self.queue.shift()) {
var command = args[0];
while (command_obj = self.queue.shift()) {
var command = command_obj[0];
var call_on_write = command_obj.length === 4 ? command_obj[3] : undefined;
var cb;
if (typeof args[2] === 'function') {
cb = batch_callback(self, args[2], index);
if (typeof command_obj[2] === 'function') {
cb = batch_callback(self, command_obj[2], index);
} else {
cb = callback_without_own_cb;
}
if (typeof callback === 'function' && index === len - 1) {
cb = last_callback(cb);
}
self._client.internal_send_command(command, args[1], cb);
this._client.internal_send_command(command, command_obj[1], cb, call_on_write);
index++;
}
self._client.uncork();

View File

@@ -255,12 +255,12 @@ describe("The 'multi' method", function () {
multi2.set('m2', '456');
multi1.set('m1', '123');
multi1.get('m1');
multi2.get('m2');
multi2.get('m1');
multi2.ping();
multi1.exec(end);
multi2.exec(function (err, res) {
assert.strictEqual(res[1], '456');
assert.strictEqual(res[1], '123');
end();
});
});