diff --git a/index.js b/index.js index f1e1b12da3..9beea81b94 100644 --- a/index.js +++ b/index.js @@ -144,30 +144,28 @@ RedisClient.prototype.initializeRetryVars = function () { } // Flush provided queues, erroring any items with a callback first -RedisClient.prototype.flushAndError = function (errorAttributes, options) { +RedisClient.prototype.flushAndError = function (message, code, options) { options = options || {} const queueNames = options.queues || ['commandQueue', 'offlineQueue'] // Flush the commandQueue first to keep the order intact for (var i = 0; i < queueNames.length; i++) { // If the command was fired it might have been processed so far - if (queueNames[i] === 'commandQueue') { - errorAttributes.message += ' It might have been processed.' - } else { // As the commandQueue is flushed first, remove this for the offline queue - errorAttributes.message = errorAttributes.message.replace(' It might have been processed.', '') - } - // Don't flush everything from the queue - for (var commandObj = this[queueNames[i]].shift(); commandObj; commandObj = this[queueNames[i]].shift()) { - const err = new errorClasses.AbortError(errorAttributes) - if (commandObj.error) { - err.stack = err.stack + commandObj.error.stack.replace(/^Error.*?\n/, '\n') - } - err.command = commandObj.command.toUpperCase() - if (commandObj.args && commandObj.args.length) { - err.args = commandObj.args + const ErrorClass = queueNames[i] === 'commandQueue' + ? Errors.InterruptError + : Errors.AbortError + + while (this[queueNames[i]].length) { + const command = this[queueNames[i]].shift() + const err = new ErrorClass(message) + err.code = code + err.command = command.command.toUpperCase() + err.args = command.args + if (command.error) { + err.stack = err.stack + command.error.stack.replace(/^Error.*?\n/, '\n') } if (options.error) { err.origin = options.error } - commandObj.callback(err) + command.callback(err) } } } diff --git a/lib/connect.js b/lib/connect.js index 3a47f94a61..4dfe3f933a 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -29,10 +29,7 @@ function createParser (client) { // Note: the execution order is important. First flush and emit, then create the stream err.message += '. Please report this.' client.ready = false - client.flushAndError({ - message: 'Fatal error encountered. Command aborted.', - code: 'NR_FATAL' - }, { + client.flushAndError('Fatal error encountered. Command aborted.', 'NR_FATAL', { error: err, queues: ['commandQueue'] }) diff --git a/lib/extendedApi.js b/lib/extendedApi.js index 6a288298a0..297151934e 100644 --- a/lib/extendedApi.js +++ b/lib/extendedApi.js @@ -46,10 +46,7 @@ RedisClient.prototype.end = function (flush) { // Flush queue if wanted if (flush) { - this.flushAndError({ - message: 'Connection forcefully ended and command aborted.', - code: 'NR_CLOSED' - }) + this.flushAndError('Connection forcefully ended and command aborted.', 'NR_CLOSED') } // Clear retryTimer if (this.retryTimer) { diff --git a/lib/individualCommands.js b/lib/individualCommands.js index 809cd4ee0c..43a0224f0f 100644 --- a/lib/individualCommands.js +++ b/lib/individualCommands.js @@ -107,6 +107,12 @@ Multi.prototype.quit = function quit () { return this } +/** + * @description Return a function that receives the raw info data and convert to an object. + * + * @param {RedisClient} client + * @returns {function} + */ function infoCallback (client) { return function (err, res) { if (err) { diff --git a/lib/multi.js b/lib/multi.js index 2160037549..a6d8f8ff56 100644 --- a/lib/multi.js +++ b/lib/multi.js @@ -1,5 +1,6 @@ 'use strict' +const Errors = require('redis-errors') const Queue = require('denque') const utils = require('./utils') const Command = require('./command') @@ -136,7 +137,7 @@ function execBatch (multi) { client.uncork() return Promise.all(promises).then((res) => { if (error) { - const err = new Error('bla failed') + const err = new Errors.RedisError('bla failed') err.code = 'foo' err.replies = res return Promise.reject(err) diff --git a/lib/readyHandler.js b/lib/readyHandler.js index 8997575740..1ff96424b7 100644 --- a/lib/readyHandler.js +++ b/lib/readyHandler.js @@ -12,26 +12,41 @@ function onConnect (client) { client._stream.setKeepAlive(client.options.socketKeepalive) client._stream.setTimeout(0) + // TODO: Deprecate the connect event. client.emit('connect') client.initializeRetryVars() if (client.options.noReadyCheck) { - onReady(client) + readyHandler(client) } else { readyCheck(client) } } +/** + * @description Empty the offline queue and call the commands + * + * @param {RedisClient} client + */ function sendOfflineQueue (client) { - while (client.offlineQueue.length) { - const command = client.offlineQueue.shift() + const queue = client.offlineQueue + while (queue.length) { + const command = queue.shift() debug('Sending offline command: %s', command.command) client.internalSendCommand(command) } } -function onReady (client) { - debug('onReady called %s id %s', client.address, client.connectionId) +/** + * @description Transparently perform predefined commands and emit ready. + * + * Emit ready before the all commands returned. + * The order of the commands is important. + * + * @param {RedisClient} client + */ +function readyHandler (client) { + debug('readyHandler called %s id %s', client.address, client.connectionId) client.ready = true client.cork = () => { @@ -50,7 +65,6 @@ function onReady (client) { client._stream.uncork() } - // Restore modal commands from previous connection. The order of the commands is important if (client.selectedDb !== undefined) { client.internalSendCommand(new Command('select', [client.selectedDb])).catch((err) => { if (!client.closing) { @@ -78,7 +92,7 @@ function onReady (client) { // // individual: function noop () {} // } if (!client.options.disableResubscribing && callbackCount) { - debug('Sending pub/sub onReady commands') + debug('Sending pub/sub commands') for (const key in client.subscriptionSet) { if (client.subscriptionSet.hasOwnProperty(key)) { const command = key.slice(0, key.indexOf('_')) @@ -95,55 +109,54 @@ function onReady (client) { client.emit('ready') } -function onInfoFail (client, err) { - if (client.closing) { - return - } - - if (err.message === "ERR unknown command 'info'") { - onReady(client) - return - } - err.message = `Ready check failed: ${err.message}` - client.emit('error', err) - return -} - -function onInfoCmd (client, res) { - /* istanbul ignore if: some servers might not respond with any info data. client is just a safety check that is difficult to test */ - if (!res) { - debug('The info command returned without any data.') - onReady(client) - return - } - - if (!client.serverInfo.loading || client.serverInfo.loading === '0') { - // If the master_link_status exists but the link is not up, try again after 50 ms - if (client.serverInfo.master_link_status && client.serverInfo.master_link_status !== 'up') { - client.serverInfo.loading_eta_seconds = 0.05 - } else { - // Eta loading should change - debug('Redis server ready.') - onReady(client) - return - } - } - - var retryTime = +client.serverInfo.loading_eta_seconds * 1000 - if (retryTime > 1000) { - retryTime = 1000 - } - debug('Redis server still loading, trying again in %s', retryTime) - setTimeout((client) => readyCheck(client), retryTime, client) -} - +/** + * @description Perform a info command and check if Redis is ready + * + * @param {RedisClient} client + */ function readyCheck (client) { debug('Checking server ready state...') // Always fire client info command as first command even if other commands are already queued up client.ready = true - client.info() - .then((res) => onInfoCmd(client, res)) - .catch((err) => onInfoFail(client, err)) + client.info().then((res) => { + /* istanbul ignore if: some servers might not respond with any info data. client is just a safety check that is difficult to test */ + if (!res) { + debug('The info command returned without any data.') + readyHandler(client) + return + } + + if (!client.serverInfo.loading || client.serverInfo.loading === '0') { + // If the master_link_status exists but the link is not up, try again after 50 ms + if (client.serverInfo.master_link_status && client.serverInfo.master_link_status !== 'up') { + client.serverInfo.loading_eta_seconds = 0.05 + } else { + // Eta loading should change + debug('Redis server ready.') + readyHandler(client) + return + } + } + + var retryTime = +client.serverInfo.loading_eta_seconds * 1000 + if (retryTime > 1000) { + retryTime = 1000 + } + debug('Redis server still loading, trying again in %s', retryTime) + setTimeout((client) => readyCheck(client), retryTime, client) + }).catch((err) => { + if (client.closing) { + return + } + + if (err.message === "ERR unknown command 'info'") { + readyHandler(client) + return + } + err.message = `Ready check failed: ${err.message}` + client.emit('error', err) + return + }) client.ready = false } diff --git a/lib/reconnect.js b/lib/reconnect.js index 2dc123e9db..d357a9b992 100644 --- a/lib/reconnect.js +++ b/lib/reconnect.js @@ -1,5 +1,6 @@ 'use strict' +const Errors = require('redis-errors') const debug = require('./debug') var lazyConnect = function (client) { lazyConnect = require('./connect') @@ -63,13 +64,10 @@ function reconnect (client, why, error) { if (why === 'timeout') { var message = 'Redis connection in broken state: connection timeout exceeded.' - const err = new Error(message) + const err = new Errors.RedisError(message) // TODO: Find better error codes... err.code = 'CONNECTION_BROKEN' - client.flushAndError({ - message: message, - code: 'CONNECTION_BROKEN' - }) + client.flushAndError(message, 'CONNECTION_BROKEN') client.emit('error', err) client.end(false) return @@ -78,10 +76,7 @@ function reconnect (client, why, error) { // If client is a requested shutdown, then don't retry if (client.closing) { debug('Connection ended by quit / end command, not retrying.') - client.flushAndError({ - message: 'Stream connection ended and command aborted.', - code: 'NR_CLOSED' - }, { + client.flushAndError('Stream connection ended and command aborted.', 'NR_CLOSED', { error }) return @@ -98,10 +93,7 @@ function reconnect (client, why, error) { if (client.retryDelay instanceof Error) { error = client.retryDelay } - client.flushAndError({ - message: 'Stream connection ended and command aborted.', - code: 'NR_CLOSED' - }, { + client.flushAndError('Stream connection ended and command aborted.', 'NR_CLOSED', { error }) // TODO: Check if client is so smart @@ -117,10 +109,7 @@ function reconnect (client, why, error) { client.offlineQueue.unshift.apply(client.offlineQueue, client.commandQueue.toArray()) client.commandQueue.clear() } else if (client.commandQueue.length !== 0) { - client.flushAndError({ - message: 'Redis connection lost and command aborted.', - code: 'UNCERTAIN_STATE' - }, { + client.flushAndError('Redis connection lost and command aborted.', 'UNCERTAIN_STATE', { error, queues: ['commandQueue'] })