From 0424cb0bf39f747e2c9d66077d7bb8bd654d4701 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Thu, 14 Apr 2016 01:11:20 +0200 Subject: [PATCH] Move pub sub command into individual commands and use call_on_write --- index.js | 23 ++-- lib/command.js | 7 +- lib/individualCommands.js | 239 +++++++++++++++++++++++++++++++++++++- lib/multi.js | 86 +++++--------- test/multi.spec.js | 4 +- 5 files changed, 280 insertions(+), 79 deletions(-) diff --git a/index.js b/index.js index ac2c0fc517..8b5d917be9 100644 --- a/index.js +++ b/index.js @@ -484,7 +484,7 @@ RedisClient.prototype.ready_check = function () { RedisClient.prototype.send_offline_queue = function () { for (var command_obj = this.offline_queue.shift(); command_obj; command_obj = this.offline_queue.shift()) { debug('Sending offline command: ' + command_obj.command); - this.internal_send_command(command_obj.command, command_obj.args, command_obj.callback); + this.internal_send_command(command_obj.command, command_obj.args, command_obj.callback, command_obj.call_on_write); } this.drain(); // Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue @@ -771,8 +771,10 @@ function handle_offline_command (self, command_obj) { self.should_buffer = true; } -RedisClient.prototype.internal_send_command = function (command, args, callback) { - var arg, prefix_keys; +// Do not call internal_send_command directly, if you are not absolutly certain it handles everything properly +// e.g. monitor / info does not work with internal_send_command only +RedisClient.prototype.internal_send_command = function (command, args, callback, call_on_write) { + var arg, prefix_keys, command_obj; var i = 0; var command_str = ''; var len = args.length; @@ -786,7 +788,7 @@ RedisClient.prototype.internal_send_command = function (command, args, callback) if (this.ready === false || this.stream.writable === false) { // Handle offline commands right away - handle_offline_command(this, new OfflineCommand(command, args, callback)); + handle_offline_command(this, new OfflineCommand(command, args, callback, call_on_write)); return false; // Indicate buffering } @@ -834,15 +836,7 @@ RedisClient.prototype.internal_send_command = function (command, args, callback) } } args = null; - var command_obj = new Command(command, args_copy, callback); - command_obj.buffer_args = buffer_args; - - if (SUBSCRIBE_COMMANDS[command] && this.pub_sub_mode === 0) { - // If pub sub is already activated, keep it that way, otherwise set the number of commands to resolve until pub sub mode activates - // Deactivation of the pub sub mode happens in the result handler - this.pub_sub_mode = this.command_queue.length + 1; - } - this.command_queue.push(command_obj); + command_obj = new Command(command, args_copy, buffer_args, callback); if (this.options.prefix) { prefix_keys = commands.getKeyIndexes(command, args_copy); @@ -881,6 +875,9 @@ RedisClient.prototype.internal_send_command = function (command, args, callback) debug('send_command: buffer send ' + arg.length + ' bytes'); } } + if (call_on_write) { + call_on_write(); + } return !this.should_buffer; }; diff --git a/lib/command.js b/lib/command.js index e4467fb55d..ee1181ea7e 100644 --- a/lib/command.js +++ b/lib/command.js @@ -2,18 +2,19 @@ // This Command constructor is ever so slightly faster than using an object literal, but more importantly, using // a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots. -function Command (command, args, callback) { +function Command (command, args, buffer_args, callback) { this.command = command; this.args = args; // We only need the args for the offline commands => move them into another class. We need the number of args though for pub sub - this.buffer_args = false; + this.buffer_args = buffer_args; this.callback = callback; this.sub_commands_left = args.length; } -function OfflineCommand (command, args, callback) { +function OfflineCommand (command, args, callback, call_on_write) { this.command = command; this.args = args; this.callback = callback; + this.call_on_write = call_on_write; } module.exports = { diff --git a/lib/individualCommands.js b/lib/individualCommands.js index 953a9337f2..4ebd8ae170 100644 --- a/lib/individualCommands.js +++ b/lib/individualCommands.js @@ -7,9 +7,18 @@ var no_password_is_set = /no password is set/; var loading = /LOADING/; var RedisClient = require('../').RedisClient; -/******************************** -Replace built-in redis functions -********************************/ +/******************************************************************************************** + Replace built-in redis functions + + The callback may be hooked as needed. The same does not apply to the rest of the function. + State should not be set outside of the callback if not absolutly necessary. + This is important to make sure it works the same as single command or in a multi context. + To make sure everything works with the offline queue use the "call_on_write" function. + This is going to be executed while writing to the stream. + + TODO: Implement individal command generation as soon as possible to prevent divergent code + on single and multi calls! +********************************************************************************************/ RedisClient.prototype.multi = RedisClient.prototype.MULTI = function multi (args) { var multi = new Multi(this, args); @@ -209,3 +218,227 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () { } return this.internal_send_command('hmset', arr, callback); }; + +RedisClient.prototype.subscribe = RedisClient.prototype.SUBSCRIBE = function subscribe () { + var arr, + len = arguments.length, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + var self = this; + var call_on_write = function () { + self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; + }; + return this.internal_send_command('subscribe', arr, callback, call_on_write); +}; + +Multi.prototype.subscribe = Multi.prototype.SUBSCRIBE = function subscribe () { + var arr, + len = arguments.length, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + var self = this._client; + var call_on_write = function () { + self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; + }; + this.queue.push(['subscribe', arr, callback, call_on_write]); + return this; +}; + +RedisClient.prototype.unsubscribe = RedisClient.prototype.UNSUBSCRIBE = function unsubscribe () { + var arr, + len = arguments.length, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + var self = this; + var call_on_write = function () { + // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback + self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; + }; + return this.internal_send_command('unsubscribe', arr, callback, call_on_write); +}; + +Multi.prototype.unsubscribe = Multi.prototype.UNSUBSCRIBE = function unsubscribe () { + var arr, + len = arguments.length, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + var self = this._client; + var call_on_write = function () { + // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback + self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; + }; + this.queue.push(['unsubscribe', arr, callback, call_on_write]); + return this; +}; + +RedisClient.prototype.psubscribe = RedisClient.prototype.PSUBSCRIBE = function psubscribe () { + var arr, + len = arguments.length, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + var self = this; + var call_on_write = function () { + self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; + }; + return this.internal_send_command('psubscribe', arr, callback, call_on_write); +}; + +Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe () { + var arr, + len = arguments.length, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + var self = this; + var call_on_write = function () { + self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; + }; + this.queue.push(['psubscribe', arr, callback, call_on_write]); + return this; +}; + +RedisClient.prototype.punsubscribe = RedisClient.prototype.PUNSUBSCRIBE = function punsubscribe () { + var arr, + len = arguments.length, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + var self = this; + var call_on_write = function () { + // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback + self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; + }; + return this.internal_send_command('punsubscribe', arr, callback, call_on_write); +}; + +Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscribe () { + var arr, + len = arguments.length, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + var self = this; + var call_on_write = function () { + // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback + self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; + }; + this.queue.push(['punsubscribe', arr, callback, call_on_write]); + return this; +}; diff --git a/lib/multi.js b/lib/multi.js index bb2173cd2e..bdf37fe6ad 100644 --- a/lib/multi.js +++ b/lib/multi.js @@ -20,45 +20,8 @@ function Multi (client, args) { } } -Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () { - var arr, - len = 0, - callback, - i = 0; - if (Array.isArray(arguments[0])) { - arr = arguments[0]; - callback = arguments[1]; - } else if (Array.isArray(arguments[1])) { - len = arguments[1].length; - arr = new Array(len + 1); - arr[0] = arguments[0]; - for (; i < len; i += 1) { - arr[i + 1] = arguments[1][i]; - } - callback = arguments[2]; - } else if (typeof arguments[1] === 'object' && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) { - arr = [arguments[0]]; - for (var field in arguments[1]) { // jshint ignore: line - arr.push(field, arguments[1][field]); - } - callback = arguments[2]; - } else { - len = arguments.length; - // The later should not be the average use case - if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { - len--; - callback = arguments[len]; - } - arr = new Array(len); - for (; i < len; i += 1) { - arr[i] = arguments[i]; - } - } - this.queue.push(['hmset', arr, callback]); - return this; -}; - -function pipeline_transaction_command (self, command, args, index, cb) { +function pipeline_transaction_command (self, command, args, index, cb, call_on_write) { + // Queueing is done first, then the commands are executed self._client.send_command(command, args, function (err, reply) { if (err) { if (cb) { @@ -131,20 +94,22 @@ Multi.prototype.exec_transaction = function exec_transaction (callback) { pipeline_transaction_command(self, 'multi', []); // Drain queue, callback will catch 'QUEUED' or error for (var index = 0; index < len; index++) { - var args = self.queue.get(index); - var command = args[0]; - var cb = args[2]; + // The commands may not be shifted off, since they are needed in the result handler + var command_obj = self.queue.get(index); + var command = command_obj[0]; + var cb = command_obj[2]; + var call_on_write = command_obj.length === 4 ? command_obj[3] : undefined; // Keep track of who wants buffer responses: if (self._client.options.detect_buffers) { self.wants_buffers[index] = false; - for (var i = 0; i < args[1].length; i += 1) { - if (args[1][i] instanceof Buffer) { + for (var i = 0; i < command_obj[1].length; i += 1) { + if (command_obj[1][i] instanceof Buffer) { self.wants_buffers[index] = true; break; } } } - pipeline_transaction_command(self, command, args[1], index, cb); + pipeline_transaction_command(self, command, command_obj[1], index, cb, call_on_write); } self._client.internal_send_command('exec', [], function (err, replies) { @@ -171,7 +136,18 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct var self = this; var len = self.queue.length; var index = 0; - var args; + var command_obj; + self._client.cork(); + if (!callback) { + while (command_obj = self.queue.shift()) { + self._client.internal_send_command(command_obj[0], command_obj[1], command_obj[2], (command_obj.length === 4 ? command_obj[3] : undefined)); + } + self._client.uncork(); + return !self._client.should_buffer; + } else if (len === 0) { + utils.reply_in_order(self._client, callback, null, []); + return !self._client.should_buffer; + } var callback_without_own_cb = function (err, res) { if (err) { self.results.push(err); @@ -190,26 +166,20 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct callback(null, self.results); }; }; - if (len === 0) { - if (callback) { - utils.reply_in_order(self._client, callback, null, []); - } - return true; - } self.results = []; - self._client.cork(); - while (args = self.queue.shift()) { - var command = args[0]; + while (command_obj = self.queue.shift()) { + var command = command_obj[0]; + var call_on_write = command_obj.length === 4 ? command_obj[3] : undefined; var cb; - if (typeof args[2] === 'function') { - cb = batch_callback(self, args[2], index); + if (typeof command_obj[2] === 'function') { + cb = batch_callback(self, command_obj[2], index); } else { cb = callback_without_own_cb; } if (typeof callback === 'function' && index === len - 1) { cb = last_callback(cb); } - self._client.internal_send_command(command, args[1], cb); + this._client.internal_send_command(command, command_obj[1], cb, call_on_write); index++; } self._client.uncork(); diff --git a/test/multi.spec.js b/test/multi.spec.js index 2ad9a23051..3227ecda2a 100644 --- a/test/multi.spec.js +++ b/test/multi.spec.js @@ -255,12 +255,12 @@ describe("The 'multi' method", function () { multi2.set('m2', '456'); multi1.set('m1', '123'); multi1.get('m1'); - multi2.get('m2'); + multi2.get('m1'); multi2.ping(); multi1.exec(end); multi2.exec(function (err, res) { - assert.strictEqual(res[1], '456'); + assert.strictEqual(res[1], '123'); end(); }); });