diff --git a/lib/commands.js b/lib/commands.js index b48eaf3b8f..d67da61fed 100644 --- a/lib/commands.js +++ b/lib/commands.js @@ -40,7 +40,7 @@ commands.list.forEach((command) => { for (var i = 0; i < len; i += 1) { arr[i] = arguments[i] } - this.queue.push(new Command(command, arr)) + this._queue.push(new Command(command, arr)) return this } if (multiProto[command] !== commandName) { diff --git a/lib/extendedApi.js b/lib/extendedApi.js index e52a920537..5b915b48ea 100644 --- a/lib/extendedApi.js +++ b/lib/extendedApi.js @@ -4,6 +4,7 @@ const utils = require('./utils') const debug = require('./debug') const RedisClient = require('../').RedisClient const Command = require('./command') +const Multi = require('./multi') const noop = function () {} /********************************************** @@ -77,7 +78,6 @@ RedisClient.prototype.unref = function () { } // TODO: promisify this -// TODO: the sendCommand legacy module should also make duplicate handle callbacks again RedisClient.prototype.duplicate = function (options, callback) { if (typeof options === 'function') { callback = options @@ -105,3 +105,13 @@ RedisClient.prototype.duplicate = function (options, callback) { } return client } + +// Note: this overrides a native function! +RedisClient.prototype.multi = function multi (args) { + return new Multi(this, 'multi', args) +} + +// Note: This is not a native function but is still handled as a individual command as it behaves just the same as multi +RedisClient.prototype.batch = function batch (args) { + return new Multi(this, 'batch', args) +} diff --git a/lib/individualCommands.js b/lib/individualCommands.js index 41a5dad0ff..7447e7d2bd 100644 --- a/lib/individualCommands.js +++ b/lib/individualCommands.js @@ -17,19 +17,10 @@ const RedisClient = require('../').RedisClient TODO: Implement individual command generation as soon as possible to prevent divergent code on single and multi calls! + + TODO: Implement hooks to replace this. Most of these things are perfect for hooks ********************************************************************************************/ -RedisClient.prototype.multi = function multi (args) { - const multi = new Multi(this, args) - multi.exec = multi.EXEC = multi.execTransaction - return multi -} - -// ATTENTION: This is not a native function but is still handled as a individual command as it behaves just the same as multi -RedisClient.prototype.batch = function batch (args) { - return new Multi(this, args) -} - function selectCallback (self, db) { return function (err, res) { if (err === null) { @@ -45,7 +36,7 @@ RedisClient.prototype.select = function select (db) { } Multi.prototype.select = function select (db) { - this.queue.push(new Command('select', [db], null, selectCallback(this._client, db))) + this._queue.push(new Command('select', [db], null, selectCallback(this._client, db))) return this } @@ -66,7 +57,7 @@ Multi.prototype.monitor = function monitor () { const callOnWrite = () => { this._client.monitoring = true } - this.queue.push(new Command('monitor', [], callOnWrite)) + this._queue.push(new Command('monitor', [], callOnWrite)) return this } // Set multi monitoring to indicate the exec that it should abort @@ -111,7 +102,7 @@ Multi.prototype.quit = function quit () { this._client.closing = true this._client.ready = false } - this.queue.push(new Command('quit', [], null, quitCallback(this._client), callOnWrite)) + this._queue.push(new Command('quit', [], null, quitCallback(this._client), callOnWrite)) return this } @@ -161,7 +152,7 @@ RedisClient.prototype.info = function info (section) { Multi.prototype.info = function info (section) { const args = section ? [section] : [] - this.queue.push(new Command('info', args, null, infoCallback(this._client))) + this._queue.push(new Command('info', args, null, infoCallback(this._client))) return this } @@ -196,7 +187,7 @@ Multi.prototype.auth = function auth (pass) { // Stash auth for connect and reconnect. this.authPass = pass - this.queue.push(new Command('auth', [pass], null, authCallback(this._client))) + this._queue.push(new Command('auth', [pass], null, authCallback(this._client))) return this } @@ -237,7 +228,7 @@ Multi.prototype.client = function client () { } } } - this.queue.push(new Command('client', arr, callOnWrite)) + this._queue.push(new Command('client', arr, callOnWrite)) return this } @@ -262,7 +253,7 @@ Multi.prototype.subscribe = function subscribe () { const callOnWrite = () => { this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1 } - this.queue.push(new Command('subscribe', arr, callOnWrite)) + this._queue.push(new Command('subscribe', arr, callOnWrite)) return this } @@ -289,7 +280,7 @@ Multi.prototype.unsubscribe = function unsubscribe () { // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1 } - this.queue.push(new Command('unsubscribe', arr, callOnWrite)) + this._queue.push(new Command('unsubscribe', arr, callOnWrite)) return this } @@ -314,7 +305,7 @@ Multi.prototype.psubscribe = function psubscribe () { const callOnWrite = () => { this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1 } - this.queue.push(new Command('psubscribe', arr, callOnWrite)) + this._queue.push(new Command('psubscribe', arr, callOnWrite)) return this } @@ -341,6 +332,6 @@ Multi.prototype.punsubscribe = function punsubscribe () { // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1 } - this.queue.push(new Command('punsubscribe', arr, callOnWrite)) + this._queue.push(new Command('punsubscribe', arr, callOnWrite)) return this } diff --git a/lib/multi.js b/lib/multi.js index 8cdbca9736..c10be736bc 100644 --- a/lib/multi.js +++ b/lib/multi.js @@ -4,67 +4,57 @@ const Queue = require('denque') const utils = require('./utils') const Command = require('./command') -// TODO: Remove support for the non chaining way of using this -// It's confusing and has no benefit -function Multi (client, args) { - this._client = client - this.queue = new Queue() - var command, tmpArgs - if (args) { // Either undefined or an array. Fail hard if it's not an array - for (let i = 0; i < args.length; i++) { - command = args[i][0] - tmpArgs = args[i].slice(1) - if (Array.isArray(command)) { - this[command[0]].apply(this, command.slice(1).concat(tmpArgs)) - } else { - this[command].apply(this, tmpArgs) - } - } - } -} - -function pipelineTransactionCommand (self, commandObj, index) { +/** + * @description Queues all transaction commands and checks if a queuing error + * occurred. + * + * @param {Multi} multi + * @param {Command} command + * @param {number} index Command index in the Multi queue + * @returns * + */ +function pipelineTransactionCommand (multi, command, index) { // Queueing is done first, then the commands are executed - const tmp = commandObj.callback - commandObj.callback = function (err, reply) { + const tmp = command.callback + command.callback = function (err, reply) { if (err) { tmp(err) err.position = index - self.errors.push(err) + multi.errors.push(err) return } // Keep track of who wants buffer responses: - // By the time the callback is called the commandObj got the bufferArgs attribute attached - self.wantsBuffers[index] = commandObj.bufferArgs + // By the time the callback is called the command got the bufferArgs attribute attached + multi.wantsBuffers[index] = command.bufferArgs tmp(null, reply) } - return self._client.internalSendCommand(commandObj) + return multi._client.internalSendCommand(command) } -Multi.prototype.execAtomic = function execAtomic () { - if (this.queue.length < 2) { - return this.execBatch() - } - return this.exec() -} - -function multiCallback (self, replies) { - var i = 0 - +/** + * @description Make sure all replies are of the correct type and call the command callback + * + * @param {Multi} multi + * @param {any[]} replies + * @returns any[] + */ +function multiCallback (multi, replies) { if (replies) { - for (let commandObj = self.queue.shift(); commandObj !== undefined; commandObj = self.queue.shift()) { - if (replies[i].message) { // instanceof Error + var i = 0 + while (multi._queue.length !== 0) { + const command = multi._queue.shift() + if (replies[i] instanceof Error) { const match = replies[i].message.match(utils.errCode) // LUA script could return user errors that don't behave like all other errors! if (match) { replies[i].code = match[1] } - replies[i].command = commandObj.command.toUpperCase() - commandObj.callback(replies[i]) + replies[i].command = command.command.toUpperCase() + command.callback(replies[i]) } else { // If we asked for strings, even in detectBuffers mode, then return strings: - replies[i] = self._client.handleReply(replies[i], commandObj.command, self.wantsBuffers[i]) - commandObj.callback(null, replies[i]) + replies[i] = multi._client.handleReply(replies[i], command.command, multi.wantsBuffers[i]) + command.callback(null, replies[i]) } i++ } @@ -73,60 +63,72 @@ function multiCallback (self, replies) { return replies } -Multi.prototype.execTransaction = function execTransaction () { - if (this.monitoring || this._client.monitoring) { +/** + * @description Execute a Redis transaction (multi ... exec) + * + * @param {Multi} multi + * @returns Promise + */ +function execTransaction (multi) { + if (multi.monitoring || multi._client.monitoring) { const err = new RangeError( 'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.' ) err.command = 'EXEC' err.code = 'EXECABORT' return new Promise((resolve, reject) => { - utils.replyInOrder(this._client, (err, res) => { + utils.replyInOrder(multi._client, (err, res) => { if (err) return reject(err) resolve(res) }, null, []) }) } - const len = this.queue.length - this.errors = [] - this._client.cork() - this.wantsBuffers = new Array(len) + const len = multi._queue.length + multi.errors = [] + multi._client.cork() + multi.wantsBuffers = new Array(len) // Silently ignore this error. We'll receive the error for the exec as well - const promises = [this._client.internalSendCommand(new Command('multi', [])).catch(() => {})] + const promises = [multi._client.internalSendCommand(new Command('multi', [])).catch(() => {})] // Drain queue, callback will catch 'QUEUED' or error for (let index = 0; index < len; index++) { // The commands may not be shifted off, since they are needed in the result handler - promises.push(pipelineTransactionCommand(this, this.queue.get(index), index).catch((e) => e)) + promises.push(pipelineTransactionCommand(multi, multi._queue.get(index), index).catch((e) => e)) } - const main = this._client.internalSendCommand(new Command('exec', [])) - this._client.uncork() - return Promise.all(promises).then(() => main.then((replies) => multiCallback(this, replies)).catch((err) => { - err.errors = this.errors + const main = multi._client.internalSendCommand(new Command('exec', [])) + multi._client.uncork() + return Promise.all(promises).then(() => main.then((replies) => multiCallback(multi, replies)).catch((err) => { + err.errors = multi.errors return Promise.reject(err) })) } -Multi.prototype.exec = Multi.prototype.execBatch = function execBatch () { - if (this.queue.length === 0) { +/** + * @description Execute a pipeline without transaction (batch ... exec) + * + * @param {Multi} multi + * @returns Promise + */ +function execBatch (multi) { + if (multi._queue.length === 0) { // TODO: return an error if not "ready" return new Promise((resolve) => { - utils.replyInOrder(this._client, (e, res) => { + utils.replyInOrder(multi._client, (e, res) => { resolve(res) }, null, []) }) } var error = false - this._client.cork() + multi._client.cork() const promises = [] - while (this.queue.length) { - const commandObj = this.queue.shift() - promises.push(this._client.internalSendCommand(commandObj).catch((e) => { + while (multi._queue.length) { + const commandObj = multi._queue.shift() + promises.push(multi._client.internalSendCommand(commandObj).catch((e) => { error = true return e })) } - this._client.uncork() + multi._client.uncork() return Promise.all(promises).then((res) => { if (error) { const err = new Error('bla failed') @@ -138,4 +140,61 @@ Multi.prototype.exec = Multi.prototype.execBatch = function execBatch () { }) } +class Multi { + /** + * Creates an instance of Multi. + * @param {RedisClient} client + * @param {string} type + * @param {any[]} [args] + * + * @memberof Multi + */ + constructor (client, type, args) { + this._client = client + this._type = type + this._queue = new Queue() + // Either undefined or an array. Fail hard if it's not an array + if (args) { + // Legacy support for passing in an array of arguments + for (let i = 0; i < args.length; i++) { + const command = args[i][0] + const tmpArgs = args[i].slice(1) + if (Array.isArray(command)) { + this[command[0]].apply(this, command.slice(1).concat(tmpArgs)) + } else { + this[command].apply(this, tmpArgs) + } + } + } + } + + /** + * @description Check the number of commands and execute those atomic + * + * @returns Promise + * + * @memberof Multi + */ + execAtomic () { + if (this._queue.length < 2) { + return this.execBatch() + } + return this.exec() + } + + /** + * @description Execute the corresponding multi type + * + * @returns Promise + * + * @memberof Multi + */ + exec () { + if (this._type === 'batch') { + return execBatch(this) + } + return execTransaction(this) + } +} + module.exports = Multi diff --git a/lib/utils.js b/lib/utils.js index 4f00586fc8..ae2914dc32 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -73,7 +73,6 @@ function replyInOrder (self, callback, err, res, queue) { callback(err, res) }) } else { - // TODO: Change this to chain promises instead const tmp = commandObj.callback commandObj.callback = function (e, r) { tmp(e, r) diff --git a/package.json b/package.json index 568b8aa2d1..976dbfdfb2 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "redis-parser": "^3.0.0" }, "engines": { - "node": ">=6" + "node": ">=4" }, "devDependencies": { "coveralls": "^2.11.2",