diff --git a/index.js b/index.js index 6518e0da94..bf9b966424 100644 --- a/index.js +++ b/index.js @@ -54,7 +54,6 @@ function RedisClient (options, stream) { this.connectionOptions = cnxOptions this.connectionId = RedisClient.connectionId++ this.connected = false - this.ready = false if (options.socketKeepalive === undefined) { options.socketKeepalive = true } @@ -81,7 +80,6 @@ function RedisClient (options, stream) { // Only used as timeout until redis has to be connected to redis until throwing an connection error this.connectTimeout = +options.connectTimeout || 60000 // 60 * 1000 ms this.enableOfflineQueue = options.enableOfflineQueue !== false - this.initializeRetryVars() this.pubSubMode = 0 this.subscriptionSet = {} this.monitoring = false @@ -109,6 +107,7 @@ function RedisClient (options, stream) { } this.retryStrategyProvided = !!options.retryStrategy this.subscribeChannels = [] + utils.setReconnectDefaults(this) // Init parser and connect connect(this) this.on('newListener', function (event) { @@ -122,40 +121,6 @@ util.inherits(RedisClient, EventEmitter) RedisClient.connectionId = 0 -RedisClient.prototype.initializeRetryVars = function () { - this.retryTimer = null - this.retryTotaltime = 0 - this.retryDelay = 100 - this.attempts = 1 -} - -// Flush provided queues, erroring any items with a callback first -RedisClient.prototype.flushAndError = function (message, code, options) { - options = options || {} - const queueNames = options.queues || ['commandQueue', 'offlineQueue'] // Flush the commandQueue first to keep the order intact - for (var i = 0; i < queueNames.length; i++) { - // If the command was fired it might have been processed so far - const ErrorClass = queueNames[i] === 'commandQueue' - ? Errors.InterruptError - : Errors.AbortError - - while (this[queueNames[i]].length) { - const command = this[queueNames[i]].shift() - const err = new ErrorClass(message) - err.code = code - err.command = command.command.toUpperCase() - err.args = command.args - if (command.error) { - err.stack = err.stack + command.error.stack.replace(/^Error.*?\n/, '\n') - } - if (options.error) { - err.origin = options.error - } - command.callback(err) - } - } -} - // Do not call internalSendCommand directly, if you are not absolutely certain it handles everything properly // e.g. monitor / info does not work with internalSendCommand only RedisClient.prototype.internalSendCommand = function (commandObj) { diff --git a/lib/connect.js b/lib/connect.js index 74fb3ab271..343ba2066f 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -3,13 +3,18 @@ const tls = require('tls') const Parser = require('redis-parser') const net = require('net') -const reconnect = require('./reconnect') const onConnect = require('./readyHandler') const debug = require('./debug') const replyHandler = require('./replyHandler') +const flushAndError = require('./flushAndError') const onResult = replyHandler.onResult const onError = replyHandler.onError +var reconnect = function (client, why, err) { + reconnect = require('./reconnect') + reconnect(client, why, err) +} + function onStreamError (client, err) { if (client.closing) { return @@ -48,7 +53,7 @@ function createParser (client) { // Note: the execution order is important. First flush and emit, then create the stream err.message += '. Please report this.' client.ready = false - client.flushAndError('Fatal error encountered. Command aborted.', 'NR_FATAL', { + flushAndError(client, 'Fatal error encountered. Command aborted.', 'NR_FATAL', { error: err, queues: ['commandQueue'] }) diff --git a/lib/extendedApi.js b/lib/extendedApi.js index 297151934e..59cafce930 100644 --- a/lib/extendedApi.js +++ b/lib/extendedApi.js @@ -5,6 +5,7 @@ const debug = require('./debug') const RedisClient = require('../').RedisClient const Command = require('./command') const Multi = require('./multi') +const flushAndError = require('./flushAndError') const noop = function () {} /********************************************** @@ -46,7 +47,7 @@ RedisClient.prototype.end = function (flush) { // Flush queue if wanted if (flush) { - this.flushAndError('Connection forcefully ended and command aborted.', 'NR_CLOSED') + flushAndError(this, 'Connection forcefully ended and command aborted.', 'NR_CLOSED') } // Clear retryTimer if (this.retryTimer) { diff --git a/lib/flushAndError.js b/lib/flushAndError.js new file mode 100644 index 0000000000..bf9b1c2a35 --- /dev/null +++ b/lib/flushAndError.js @@ -0,0 +1,32 @@ +'use strict' + +const Errors = require('redis-errors') + +// Flush provided queues, erroring any items with a callback first +function flushAndError (client, message, code, options) { + options = options || {} + const queueNames = options.queues || ['commandQueue', 'offlineQueue'] // Flush the commandQueue first to keep the order intact + for (var i = 0; i < queueNames.length; i++) { + // If the command was fired it might have been processed so far + const ErrorClass = queueNames[i] === 'commandQueue' + ? Errors.InterruptError + : Errors.AbortError + + while (client[queueNames[i]].length) { + const command = client[queueNames[i]].shift() + const err = new ErrorClass(message) + err.code = code + err.command = command.command.toUpperCase() + err.args = command.args + if (command.error) { + err.stack = err.stack + command.error.stack.replace(/^Error.*?\n/, '\n') + } + if (options.error) { + err.origin = options.error + } + command.callback(err) + } + } +} + +module.exports = flushAndError diff --git a/lib/readyHandler.js b/lib/readyHandler.js index 022b310274..415e054aed 100644 --- a/lib/readyHandler.js +++ b/lib/readyHandler.js @@ -2,6 +2,7 @@ const debug = require('./debug') const Command = require('./command') +const utils = require('./utils') function onConnect (client) { debug('Stream connected %s id %s', client.address, client.connectionId) @@ -10,14 +11,12 @@ function onConnect (client) { // fast properties. If that's not the case, make them fast properties // again! client.connected = true - client.ready = false - client.emittedEnd = false client._stream.setKeepAlive(client.options.socketKeepalive) client._stream.setTimeout(0) // TODO: Deprecate the connect event. client.emit('connect') - client.initializeRetryVars() + utils.setReconnectDefaults(client) if (client.options.noReadyCheck) { readyHandler(client) diff --git a/lib/reconnect.js b/lib/reconnect.js index 68ce508a45..8fc407ac34 100644 --- a/lib/reconnect.js +++ b/lib/reconnect.js @@ -2,10 +2,8 @@ const Errors = require('redis-errors') const debug = require('./debug') -var lazyConnect = function (client) { - lazyConnect = require('./connect') - lazyConnect(client) -} +const flushAndError = require('./flushAndError') +const connect = require('./connect') /** * @description Try connecting to a server again @@ -27,7 +25,7 @@ function retryConnection (client, error) { client.retryTotaltime += client.retryDelay client.attempts += 1 - lazyConnect(client) + connect(client) client.retryTimer = null } @@ -51,18 +49,14 @@ function reconnect (client, why, error) { client.ready = false client.pubSubMode = 0 - // since we are collapsing end and close, users don't expect to be called twice - if (!client.emittedEnd) { - client.emit('end') - client.emittedEnd = true - } + client.emit('end') if (why === 'timeout') { var message = 'Redis connection in broken state: connection timeout exceeded.' const err = new Errors.RedisError(message) // TODO: Find better error codes... err.code = 'CONNECTION_BROKEN' - client.flushAndError(message, 'CONNECTION_BROKEN') + flushAndError(client, message, 'CONNECTION_BROKEN') client.emit('error', err) client.end(false) return @@ -71,7 +65,7 @@ function reconnect (client, why, error) { // If client is a requested shutdown, then don't retry if (client.closing) { debug('Connection ended by quit / end command, not retrying.') - client.flushAndError('Stream connection ended and command aborted.', 'NR_CLOSED', { + flushAndError(client, 'Stream connection ended and command aborted.', 'NR_CLOSED', { error }) return @@ -88,7 +82,7 @@ function reconnect (client, why, error) { if (client.retryDelay instanceof Error) { error = client.retryDelay } - client.flushAndError('Stream connection ended and command aborted.', 'NR_CLOSED', { + flushAndError(client, 'Stream connection ended and command aborted.', 'NR_CLOSED', { error }) // TODO: Check if client is so smart @@ -107,7 +101,7 @@ function reconnect (client, why, error) { // We could postpone writing to the stream until we connected again and fire the commands. // The commands in the pipelineQueue are also not uncertain. They never left the client. } else if (client.commandQueue.length !== 0 || client._pipelineQueue.length !== 0) { - client.flushAndError('Redis connection lost and command aborted.', 'UNCERTAIN_STATE', { + flushAndError(client, 'Redis connection lost and command aborted.', 'UNCERTAIN_STATE', { error, queues: ['commandQueue', '_pipelineQueue'] }) diff --git a/lib/utils.js b/lib/utils.js index dc2221602f..1a146774ec 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -149,6 +149,14 @@ function handleReply (client, reply, command) { return reply } +function setReconnectDefaults (client) { + client.retryTimer = null + client.retryTotaltime = 0 + client.retryDelay = 100 + client.attempts = 1 + client.ready = false +} + module.exports = { replyToStrings, replyToObject, @@ -157,5 +165,6 @@ module.exports = { clone: convenienceClone, replyInOrder, warn, - handleReply + handleReply, + setReconnectDefaults }