diff --git a/.eslintrc.json b/.eslintrc.json index 698a4b3a28..8666a79dd4 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -21,7 +21,8 @@ "no-param-reassign": "off", "vars-on-top": "off", "no-mixed-operators": "off", - "prefer-spread": "off" + "prefer-spread": "off", + "function-paren-newline": "off" }, "env": { "mocha": 2 diff --git a/benchmarks/multi_bench.js b/benchmarks/multi_bench.js index 6c87d5b580..99e5d6e06d 100644 --- a/benchmarks/multi_bench.js +++ b/benchmarks/multi_bench.js @@ -120,7 +120,11 @@ Test.prototype.newClient = function (id) { } Test.prototype.onClientsReady = function () { - process.stdout.write(`${lpad(this.args.descr, 13)}, ${this.args.batch ? lpad(`batch ${this.args.batch}`, 9) : lpad(this.args.pipeline, 9)}/${this.clientsReady} `) + process.stdout.write( + `${lpad(this.args.descr, 13)}, ` + + `${this.args.batch ? lpad(`batch ${this.args.batch}`, 9) : lpad(this.args.pipeline, 9)}/` + + `${this.clientsReady} ` + ) this.testStart = Date.now() return this.fillPipeline() } @@ -140,20 +144,18 @@ Test.prototype.fillPipeline = function () { if (this.batchPipeline) { return this.batch() } - const promises = [] while (pipeline < this.maxPipeline) { this.commandsSent++ pipeline++ - promises.push(this.sendNext()) + this.sendNext() } - return Promise.all(promises) } Test.prototype.batch = function () { const curClient = clientNr++ % this.clients.length const start = process.hrtime() let i = 0 - const batch = this.clients[curClient].batch() + const batch = this.clients[curClient].multi() while (i++ < this.batchPipeline) { this.commandsSent++ diff --git a/changelog.md b/changelog.md index 165573b28c..12c3c2d70c 100644 --- a/changelog.md +++ b/changelog.md @@ -27,7 +27,7 @@ Bugfixes Features - Native promise support - Auto pipelining -- The client is now exported directly and be instantiated directly +- The client is now exported directly and can be instantiated directly - `client.duplicate` will now also transition into pub sub or monitor mode Breaking Changes @@ -35,7 +35,7 @@ Breaking Changes - Dropped support for `snake_case` - Dropped support for domains - Dropped support for Redis 2.4 -- Dropped support for Node.js < 4 +- Dropped support for Node.js < 6 - Removed `drain` event - Removed `idle` event - Removed `parser` option diff --git a/lib/addCommand.js b/lib/addCommand.js index 0f4491eaa1..3ae18baa96 100644 --- a/lib/addCommand.js +++ b/lib/addCommand.js @@ -1,6 +1,6 @@ 'use strict' -const Command = require('./command') +const { Command, MultiCommand } = require('./command') function addCommand(clientProto, multiProto, command) { // Some rare Redis commands use special characters in their command name @@ -28,7 +28,7 @@ function addCommand(clientProto, multiProto, command) { // Do not override existing functions if (!multiProto[command] && command !== 'multi') { multiProto[commandName] = function (...args) { - this._queue.push(new Command(command, args)) + this._queue.push(new MultiCommand(command, args)) return this } if (!multiProto[commandName].name) { diff --git a/lib/client.js b/lib/client.js index a27bbf2de5..0b054225e6 100644 --- a/lib/client.js +++ b/lib/client.js @@ -3,7 +3,7 @@ const Queue = require('denque') const EventEmitter = require('events') const net = require('net') -const Command = require('./command') +const { Command } = require('./command') const connect = require('./connect') const debug = require('./debug') const flushAndError = require('./flushAndError') diff --git a/lib/command.js b/lib/command.js index bd280c4f6a..a6cf037814 100644 --- a/lib/command.js +++ b/lib/command.js @@ -2,36 +2,51 @@ const betterStackTraces = /development/i.test(process.env.NODE_ENV) || /\bredis\b/i.test(process.env.NODE_DEBUG) -// TODO: Change the arguments to an object -// callOnWrite could be two things now -function Command(name, args, callOnWrite, transformer) { +function Command(name, args) { + var callback + if (args.length !== 0 && typeof args[args.length - 1] === 'function') { + this.promise = undefined + callback = args.pop() + } else { + this.promise = new Promise((resolve, reject) => { + callback = (err, res) => { + if (this.transformer !== undefined) { + const tmp = this.transformer(err, res) + err = tmp[0] + res = tmp[1] + } + if (err === null) { + resolve(res) + } else { + reject(err) + } + } + }) + } + this.callback = callback this.command = name this.args = args this.argsLength = 0 this.bufferArgs = false - var callback - transformer = transformer || function (err, res) { - return err || res - } - this.promise = new Promise((resolve, reject) => { - callback = (err, res) => { - if (err) { - const transformed = transformer(err) - if (transformed.stack) { // instanceof Error - reject(transformed) - } else { - resolve(transformed) - } - } else { - resolve(transformer(null, res)) - } - } - }) - this.callback = callback - this.callOnWrite = callOnWrite + this.transformer = undefined + this.callOnWrite = undefined if (betterStackTraces) { this.error = new Error() } } -module.exports = Command +function MultiCommand(name, args) { + this.command = name + this.args = args + this.argsLength = 0 + this.bufferArgs = false + this.transformer = undefined + this.promise = undefined + this.callback = undefined + this.callOnWrite = undefined +} + +module.exports = { + Command, + MultiCommand +} diff --git a/lib/individualCommands.js b/lib/individualCommands.js index 2e7e88c5f5..8b390c82d1 100644 --- a/lib/individualCommands.js +++ b/lib/individualCommands.js @@ -1,6 +1,6 @@ 'use strict' -const Command = require('./command') +const { Command, MultiCommand } = require('./command') const debug = require('./debug') const Multi = require('./multi') const utils = require('./utils') @@ -29,16 +29,20 @@ function selectCallback(client, db) { // Store db in this.selectDb to restore it on reconnect client.selectedDb = db } - return err || res + return [err, res] } } RedisClient.prototype.select = function select(db) { - return this.internalSendCommand(new Command('select', [db], null, selectCallback(this, db))) + const command = new Command('select', [db]) + command.transformer = selectCallback(this, db) + return this.internalSendCommand(command) } Multi.prototype.select = function select(db) { - this._queue.push(new Command('select', [db], null, selectCallback(this._client, db))) + const command = new MultiCommand('select', [db]) + command.transformer = selectCallback(this._client, db) + this._queue.push(command) return this } @@ -51,7 +55,9 @@ RedisClient.prototype.monitor = function monitor() { // be properly processed. If this is not the case, it's not an issue either. this._monitoring = true } - return this.internalSendCommand(new Command('monitor', [], callOnWrite)) + const command = new Command('monitor', []) + command.callOnWrite = callOnWrite + return this.internalSendCommand(command) } // Only works with batch, not in a transaction @@ -62,7 +68,9 @@ Multi.prototype.monitor = function monitor() { const callOnWrite = () => { this._client._monitoring = true } - this._queue.push(new Command('monitor', [], callOnWrite)) + const command = new MultiCommand('monitor', []) + command.callOnWrite = callOnWrite + this._queue.push(command) return this } // Set multi monitoring to indicate the exec that it should abort @@ -84,9 +92,9 @@ function quitCallback(client) { // or the offline queue is deactivated and the command was rejected right away // or the stream is not writable // or while sending the quit, the connection ended / closed - return 'OK' + return [null, 'OK'] } - return err || res + return [err, res] } } @@ -95,7 +103,9 @@ RedisClient.prototype.quit = function quit() { // // Allow the quit command to be fired as soon as possible to prevent it // landing in the offline queue. this.ready = this.offlineQueue.length === 0; - const backpressureIndicator = this.internalSendCommand(new Command('quit', [], null, quitCallback(this))) + const command = new Command('quit', []) + command.transformer = quitCallback(this) + const backpressureIndicator = this.internalSendCommand(command) // Calling quit should always end the connection, no matter if there's a connection or not this._closing = true this.ready = false @@ -109,7 +119,10 @@ Multi.prototype.quit = function quit() { this._client._closing = true this._client.ready = false } - this._queue.push(new Command('quit', [], callOnWrite, quitCallback(this._client))) + const command = new MultiCommand('quit', []) + command.callOnWrite = callOnWrite + command.transformer = quitCallback(this._client) + this._queue.push(command) return this } @@ -122,7 +135,7 @@ Multi.prototype.quit = function quit() { function infoCallback(client) { return function (err, res) { if (err) { - return err + return [err, undefined] } if (typeof res !== 'string') { @@ -171,32 +184,36 @@ function infoCallback(client) { } client.serverInfo = obj - return res + return [null, res] } } // Store info in this.serverInfo after each call RedisClient.prototype.info = function info(section) { const args = section ? [section] : [] - return this.internalSendCommand(new Command('info', args, null, infoCallback(this))) + const command = new Command('info', args) + command.transformer = infoCallback(this) + return this.internalSendCommand(command) } Multi.prototype.info = function info(section) { const args = section ? [section] : [] - this._queue.push(new Command('info', args, null, infoCallback(this._client))) + const command = new MultiCommand('info', args) + command.transformer = infoCallback(this._client) + this._queue.push(command) return this } -function authCallback(client, pass) { +function authCallback(client) { return function (err, res) { if (err) { if (noPasswordIsSet.test(err.message)) { utils.warn(client, 'Warning: Redis server does not require a password, but a password was supplied.') - return 'OK' + return [null, 'OK'] } - return err + return [err, undefined] } - return res + return [null, res] } } @@ -207,7 +224,9 @@ RedisClient.prototype.auth = function auth(pass) { this._options.password = pass const ready = this.ready this.ready = ready || this.offlineQueue.length === 0 - const tmp = this.internalSendCommand(new Command('auth', [pass], null, authCallback(this, pass))) + const command = new Command('auth', [pass]) + command.transformer = authCallback(this) + const tmp = this.internalSendCommand(command) this.ready = ready return tmp } @@ -218,7 +237,9 @@ Multi.prototype.auth = function auth(pass) { // Stash auth for connect and reconnect. this._client._options.password = pass - this._queue.push(new Command('auth', [pass], null, authCallback(this._client))) + const command = new MultiCommand('auth', [pass]) + command.transformer = authCallback(this._client) + this._queue.push(command) return this } @@ -233,7 +254,9 @@ RedisClient.prototype.client = function client(...arr) { } } } - return this.internalSendCommand(new Command('client', arr, callOnWrite)) + const command = new Command('client', arr) + command.callOnWrite = callOnWrite + return this.internalSendCommand(command) } Multi.prototype.client = function client(...arr) { @@ -247,7 +270,9 @@ Multi.prototype.client = function client(...arr) { } } } - this._queue.push(new Command('client', arr, callOnWrite)) + const command = new MultiCommand('client', arr) + command.callOnWrite = callOnWrite + this._queue.push(command) return this } @@ -255,14 +280,18 @@ RedisClient.prototype.subscribe = function subscribe(...arr) { const callOnWrite = () => { this._pubSubMode = this._pubSubMode || this.commandQueue.length + 1 } - return this.internalSendCommand(new Command('subscribe', arr, callOnWrite)) + const command = new Command('subscribe', arr) + command.callOnWrite = callOnWrite + return this.internalSendCommand(command) } Multi.prototype.subscribe = function subscribe(...arr) { const callOnWrite = () => { this._client._pubSubMode = this._client._pubSubMode || this._client.commandQueue.length + 1 } - this._queue.push(new Command('subscribe', arr, callOnWrite)) + const command = new MultiCommand('subscribe', arr) + command.callOnWrite = callOnWrite + this._queue.push(command) return this } @@ -272,7 +301,9 @@ RedisClient.prototype.unsubscribe = function unsubscribe(...arr) { // value is manipulated in the callback this._pubSubMode = this._pubSubMode || this.commandQueue.length + 1 } - return this.internalSendCommand(new Command('unsubscribe', arr, callOnWrite)) + const command = new Command('unsubscribe', arr) + command.callOnWrite = callOnWrite + return this.internalSendCommand(command) } Multi.prototype.unsubscribe = function unsubscribe(...arr) { @@ -281,7 +312,9 @@ Multi.prototype.unsubscribe = function unsubscribe(...arr) { // value is manipulated in the callback this._client._pubSubMode = this._client._pubSubMode || this._client.commandQueue.length + 1 } - this._queue.push(new Command('unsubscribe', arr, callOnWrite)) + const command = new MultiCommand('unsubscribe', arr) + command.callOnWrite = callOnWrite + this._queue.push(command) return this } @@ -289,14 +322,18 @@ RedisClient.prototype.psubscribe = function psubscribe(...arr) { const callOnWrite = () => { this._pubSubMode = this._pubSubMode || this.commandQueue.length + 1 } - return this.internalSendCommand(new Command('psubscribe', arr, callOnWrite)) + const command = new Command('psubscribe', arr) + command.callOnWrite = callOnWrite + return this.internalSendCommand(command) } Multi.prototype.psubscribe = function psubscribe(...arr) { const callOnWrite = () => { this._client._pubSubMode = this._client._pubSubMode || this._client.commandQueue.length + 1 } - this._queue.push(new Command('psubscribe', arr, callOnWrite)) + const command = new MultiCommand('psubscribe', arr) + command.callOnWrite = callOnWrite + this._queue.push(command) return this } @@ -306,7 +343,9 @@ RedisClient.prototype.punsubscribe = function punsubscribe(...arr) { // value is manipulated in the callback this._pubSubMode = this._pubSubMode || this.commandQueue.length + 1 } - return this.internalSendCommand(new Command('punsubscribe', arr, callOnWrite)) + const command = new Command('punsubscribe', arr) + command.callOnWrite = callOnWrite + return this.internalSendCommand(command) } Multi.prototype.punsubscribe = function punsubscribe(...arr) { @@ -315,6 +354,8 @@ Multi.prototype.punsubscribe = function punsubscribe(...arr) { // value is manipulated in the callback this._client._pubSubMode = this._client._pubSubMode || this._client.commandQueue.length + 1 } - this._queue.push(new Command('punsubscribe', arr, callOnWrite)) + const command = new MultiCommand('punsubscribe', arr) + command.callOnWrite = callOnWrite + this._queue.push(command) return this } diff --git a/lib/multi.js b/lib/multi.js index f5aa1ebb28..7eb0c258a3 100644 --- a/lib/multi.js +++ b/lib/multi.js @@ -2,143 +2,154 @@ const Queue = require('denque') const Errors = require('redis-errors') -const Command = require('./command') +const { MultiCommand } = require('./command') const utils = require('./utils') const handleReply = utils.handleReply -/** - * @description Queues all transaction commands and checks if a queuing error - * occurred. - * - * @param {Multi} multi - * @param {Command} command - * @param {number} index Command index in the Multi queue - * @returns * - */ -function pipelineTransactionCommand(multi, command, index) { - // Queueing is done first, then the commands are executed - const tmp = command.callback - command.callback = function (err, reply) { - if (err) { - tmp(err) - err.position = index - multi._errors.push(err) - return - } - tmp(null, reply) - } - return multi._client.internalSendCommand(command) -} - -/** - * @description Make sure all replies are of the correct type and call the command callback - * - * @param {Multi} multi - * @param {any[]} replies - * @returns any[] - */ -function multiCallback(multi, replies) { - if (replies) { - var i = 0 - const queue = multi._queue - const client = multi._client - while (queue.length !== 0) { - const command = queue.shift() - if (replies[i] instanceof Error) { - const match = replies[i].message.match(utils.errCode) - // LUA script could return user errors that don't behave like all other errors! - if (match) { - replies[i].code = match[1] - } - replies[i].command = command.command.toUpperCase() - command.callback(replies[i]) - } else { - // If we asked for strings, even in detectBuffers mode, then return strings: - replies[i] = handleReply(client, replies[i], command) - command.callback(null, replies[i]) - } - i++ - } - } - multi._client._multi = false - return replies -} - /** * @description Execute a Redis transaction (multi ... exec) * * @param {Multi} multi + * @param {function} [callback] * @returns Promise */ -function execTransaction(multi) { +function execTransaction(multi, callback) { const client = multi._client - const queue = multi._queue if (multi._monitoring || client._monitoring) { const err = new RangeError('Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.') err.command = 'EXEC' err.code = 'EXECABORT' - return new Promise((resolve, reject) => { - utils.replyInOrder(client, reject, err) - }) + utils.replyInOrder(client, callback, err) + return } - const len = queue.length - multi._errors = [] client._multi = true - // Silently ignore this error. We'll receive the error for the exec as well - const promises = [client.internalSendCommand(new Command('multi', [])).catch(() => {})] - // Drain queue, callback will catch 'QUEUED' or error - for (let index = 0; index < len; index++) { - // The commands may not be shifted off, since they are needed in the result handler - promises.push(pipelineTransactionCommand(multi, queue.peekAt(index), index).catch(e => e)) + function receiver(err, reply) { + if (err !== null) { + multi._error = true + multi._results.push(err) + } + } + // Silently ignore the possible error. We'll receive the error for the exec as well + const multiCommand = new MultiCommand('multi', []) + multiCommand.callback = () => {} + client.internalSendCommand(multiCommand) + + const queue = multi._queue + for (var i = 0; i < queue.length; i++) { + // Drain queue, callback will catch 'QUEUED' or error + const command = queue.peekAt(i) + // Queueing is done first, then the commands are executed + command.callback = receiver + client.internalSendCommand(command) } - const main = client.internalSendCommand(new Command('exec', [])) - return Promise.all(promises) - .then(() => main - .then(replies => multiCallback(multi, replies)) - .catch((err) => { - err.errors = multi._errors - return Promise.reject(err) - })) + const execCommand = new MultiCommand('exec', []) + execCommand.callback = function (err, res) { + if (err !== null) { + multi._error = true + res = multi.results + } else if (res) { + for (var i = 0; i < queue.length; i++) { + const command = queue.peekAt(i) + if (res[i] instanceof Errors.RedisError) { + const match = res[i].message.match(utils.errCode) + // LUA script could return user errors that don't behave like all other errors! + if (match) { + res[i].code = match[1] + } + res[i].command = command.command.toUpperCase() + multi._error = true + } else { + // If we asked for strings, even in detectBuffers mode, then return strings: + res[i] = handleReply(multi._client, res[i], command) + } + } + } + if (multi._error) { + // TODO: The stack trace should be improved in case betterStackTraces is + // activated + const err = new Errors.RedisError('Batch command failed') + err.code = 'ERR' + // TODO: This was called "errors" instead of "replies". That is not + // consistent with the batch command. + err.replies = res + callback(err) + } else { + callback(null, res) + } + client._multi = false + } + client.internalSendCommand(execCommand) +} + +function newBatchReceiver(multi, transformer) { + return function receiver(err, res) { + if (transformer) { + const tmp = transformer(err, res) + err = tmp[0] + res = tmp[1] + } + if (err !== null) { + multi._error = true + multi._results.push(err) + } else { + multi._results.push(res) + } + } } /** * @description Execute a pipeline without transaction (batch ... exec) * * @param {Multi} multi + * @param {function} callback * @returns Promise */ -function execBatch(multi) { +function execBatch(multi, callback) { + var i = 0 const client = multi._client const queue = multi._queue if (queue.length === 0) { - // TODO: return an error if not "ready" - return new Promise((resolve) => { - utils.replyInOrder(client, (e, res) => { - resolve(res) - }, null, []) - }) + // This will return a result even if the client is not ready in case the + // queue is empty. + utils.replyInOrder(client, callback, null, []) + return } - var error = false - function setError(err) { - error = true - return err + // if (betterStackTraces) { + // goodStackTrace = new Error() + // } + for (; i < queue.length - 1; i++) { + const command = queue.peekAt(i) + command.callback = newBatchReceiver(multi, command.transformer) + client.internalSendCommand(command) } - const promises = [] - while (queue.length) { - const command = queue.shift() - promises.push(client.internalSendCommand(command).catch(setError)) - } - return Promise.all(promises).then((res) => { - if (error) { - const err = new Errors.RedisError('bla failed') - err.code = 'ERR' - err.replies = res - return Promise.reject(err) + + const command = queue.peekAt(i) + command.callback = function (err, res) { + if (command.transformer !== undefined) { + const tmp = command.transformer(err, res) + err = tmp[0] + res = tmp[1] } - return res - }) + if (err !== null) { + multi._error = true + multi._results.push(err) + } else { + multi._results.push(res) + } + if (multi._error) { + // TODO: The stack trace should be improved in case betterStackTraces is + // activated. + const err = new Errors.RedisError('Batch command failed') + err.code = 'ERR' + err.replies = multi._results + callback(err) + } else { + callback(null, multi._results) + } + } + client.internalSendCommand(command) } class Multi { @@ -154,6 +165,8 @@ class Multi { this._client = client this._type = type this._queue = new Queue() + this._error = false + this._results = [] // Either undefined or an array. Fail hard if it's not an array if (args) { // Legacy support for passing in an array of arguments @@ -172,29 +185,61 @@ class Multi { /** * @description Check the number of commands and execute those atomic * - * @returns Promise + * @param {function} [callback] + * + * @returns Promise|undefined * * @memberof Multi */ - execAtomic() { - if (this._queue.length < 2) { - return this.execBatch() + execAtomic(callback) { + var promise + if (callback === undefined) { + promise = new Promise((resolve, reject) => { + callback = function (err, res) { + if (err === null) { + resolve(res) + } else { + reject(err) + } + } + }) } - return this.exec() + if (this._queue.length < 2) { + this.execBatch(callback) + } else { + this.exec(callback) + } + return promise } /** * @description Execute the corresponding multi type * - * @returns Promise + * @param {function} [callback] + * + * @returns Promise|undefined * * @memberof Multi */ - exec() { - if (this._type === 'batch') { - return execBatch(this) + exec(callback) { + var promise + if (callback === undefined) { + promise = new Promise((resolve, reject) => { + callback = function (err, res) { + if (err === null) { + resolve(res) + } else { + reject(err) + } + } + }) } - return execTransaction(this) + if (this._type === 'batch') { + execBatch(this, callback) + } else { + execTransaction(this, callback) + } + return promise } } diff --git a/lib/readyHandler.js b/lib/readyHandler.js index 7b8e36d66f..cf9b33b1ff 100644 --- a/lib/readyHandler.js +++ b/lib/readyHandler.js @@ -1,6 +1,5 @@ 'use strict' -const Command = require('./command') const debug = require('./debug') const utils = require('./utils') @@ -52,8 +51,8 @@ function readyHandler(client) { client.ready = true if (client.selectedDb !== undefined) { - client.internalSendCommand(new Command('select', [client.selectedDb])).catch((err) => { - if (!client._closing) { + client.select(client.selectedDb, (err) => { + if (err !== null && !client._closing) { // TODO: These internal things should be wrapped in a // special error that describes what is happening process.nextTick(client.emit, 'error', err) @@ -61,8 +60,8 @@ function readyHandler(client) { }) } if (client._monitoring) { // Monitor has to be fired before pub sub commands - client.internalSendCommand(new Command('monitor', [])).catch((err) => { - if (!client._closing) { + client.monitor((err) => { + if (err !== null && !client._closing) { process.nextTick(client.emit, 'error', err) } }) diff --git a/test/lib/config.js b/test/lib/config.js index a072c278fd..4cb22a3d6d 100644 --- a/test/lib/config.js +++ b/test/lib/config.js @@ -6,6 +6,7 @@ const redis = require('../../index') const config = { redis, + Redis: redis, PORT: 6379, HOST: { IPv4: '127.0.0.1',