From ac26d0524d5396b12f74c7a31424a0d40f766cb8 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Tue, 30 May 2017 04:38:02 +0200 Subject: [PATCH] fix: always emit an error when the connection drops --- changelog.md | 1 + index.js | 25 ++++++++++++++----------- lib/connect.js | 3 ++- lib/flushAndError.js | 5 +++-- lib/reconnect.js | 21 ++++++++++++--------- test/connection.spec.js | 10 +++++----- test/multi.spec.js | 9 ++++----- test/node_redis.spec.js | 4 +--- 8 files changed, 42 insertions(+), 36 deletions(-) diff --git a/changelog.md b/changelog.md index ce8d0bc45f..165573b28c 100644 --- a/changelog.md +++ b/changelog.md @@ -28,6 +28,7 @@ Features - Native promise support - Auto pipelining - The client is now exported directly and be instantiated directly +- `client.duplicate` will now also transition into pub sub or monitor mode Breaking Changes diff --git a/index.js b/index.js index 22dd4e5de8..771c7d33ac 100644 --- a/index.js +++ b/index.js @@ -59,17 +59,7 @@ class RedisClient extends EventEmitter { cnxOptions.family = (!options.family && net.isIP(cnxOptions.host)) || (options.family === 'IPv6' ? 6 : 4) this.address = `${cnxOptions.host}:${cnxOptions.port}` } - - // Public Variables - this.connected = false - this.shouldBuffer = false - this.commandQueue = new Queue() // Holds sent commands to de-pipeline them - this.offlineQueue = new Queue() // Holds commands issued but not able to be sent - this.serverInfo = {} - - // Private Variables - this._connectionOptions = cnxOptions - this.connectionId = RedisClient.connectionId++ + // TODO: Properly fix typo if (options.socketKeepalive === undefined) { options.socketKeepalive = true } @@ -109,6 +99,14 @@ class RedisClient extends EventEmitter { } options.returnBuffers = !!options.returnBuffers options.detectBuffers = !!options.detectBuffers + + // Public Variables + this.connected = false + this.shouldBuffer = false + this.commandQueue = new Queue() // Holds sent commands + this.offlineQueue = new Queue() // Holds commands issued but not able to be sent yet + this.serverInfo = {} + this.connectionId = connectionId++ this.selectedDb = options.db // Save the selected db here, used when reconnecting this._strCache = '' this._pipeline = false @@ -251,7 +249,12 @@ class RedisClient extends EventEmitter { } } const client = new RedisClient(existingOptions) + // Return to the same state as the other client + // by also selecting the db / returning to pub sub + // mode or into monitor mode. client.selectedDb = this.selectedDb + client._subscriptionSet = this._subscriptionSet + client._monitoring = this._monitoring if (typeof callback === 'function') { const errorListener = (err) => { callback(err) diff --git a/lib/connect.js b/lib/connect.js index c3f100a322..465de5993f 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -102,7 +102,8 @@ function connect (client) { const stream = client._stream if (options.connectTimeout) { - // TODO: Investigate why this is not properly triggered + // TODO: Investigate why this is not properly triggered. + // TODO: Check if this works with tls. stream.setTimeout(client._connectTimeout, () => { // Note: This is only tested if a internet connection is established lazyReconnect(client, 'timeout') diff --git a/lib/flushAndError.js b/lib/flushAndError.js index bf9b1c2a35..12027fbe5c 100644 --- a/lib/flushAndError.js +++ b/lib/flushAndError.js @@ -2,10 +2,11 @@ const Errors = require('redis-errors') -// Flush provided queues, erroring any items with a callback first +// Flush provided queues, erroring out all items function flushAndError (client, message, code, options) { options = options || {} - const queueNames = options.queues || ['commandQueue', 'offlineQueue'] // Flush the commandQueue first to keep the order intact + // Flush the commandQueue first to keep the order intact + const queueNames = options.queues || ['commandQueue', 'offlineQueue'] for (var i = 0; i < queueNames.length; i++) { // If the command was fired it might have been processed so far const ErrorClass = queueNames[i] === 'commandQueue' diff --git a/lib/reconnect.js b/lib/reconnect.js index 485bf72e2d..a634bd67d6 100644 --- a/lib/reconnect.js +++ b/lib/reconnect.js @@ -78,17 +78,20 @@ function reconnect (client, why, error) { timesConnected: client.timesConnected }) if (typeof client.retryDelay !== 'number') { - // Pass individual error through + var err if (client.retryDelay instanceof Error) { - error = client.retryDelay - } - flushAndError(client, 'Stream connection ended and command aborted.', 'NR_CLOSED', { - error - }) - // TODO: Check if client is so smart - if (error) { - client.emit('error', error) + // Pass individual error through + err = client.retryDelay + flushAndError(client, 'Stream connection ended and command aborted.', 'NR_CLOSED', { error: err }) + } else { + flushAndError(client, 'Stream connection ended and command aborted.', 'NR_CLOSED', { error }) + err = new Errors.RedisError('Redis connection ended.') + err.code = 'NR_CLOSED' + if (error) { + err.origin = error + } } + client.emit('error', err) client.end(false) return } diff --git a/test/connection.spec.js b/test/connection.spec.js index 6ddc1a621d..d14788c5a5 100644 --- a/test/connection.spec.js +++ b/test/connection.spec.js @@ -153,7 +153,7 @@ describe('connection tests', () => { }) }) - it.skip('can not connect with wrong host / port in the options object', (done) => { + it('can not connect with wrong host / port in the options object', (done) => { const options = { host: 'somewhere', port: 6379, @@ -163,11 +163,10 @@ describe('connection tests', () => { client = Redis.createClient(options) assert.strictEqual(client._connectionOptions.family, ip === 'IPv6' ? 6 : 4) assert.strictEqual(Object.keys(options).length, 4) - const end = helper.callFuncAfter(done, 2) client.on('error', (err) => { - assert(/CONNECTION_BROKEN|ENOTFOUND|EAI_AGAIN/.test(err.code)) - end() + assert(/NR_CLOSED/.test(err.code)) + done() }) }) @@ -206,7 +205,7 @@ describe('connection tests', () => { }, port: 9999 }) - client.on('error', helper.isError(/Redis connection to 127\.0\.0\.1:9999 failed/)) + client.on('error', helper.isError(/Redis connection ended/)) }) it('retryStrategy used to reconnect with defaults', (done) => { @@ -230,6 +229,7 @@ describe('connection tests', () => { setTimeout(() => { client._stream.destroy() }, 50) + client.on('error', helper.isError(/Redis connection ended/)) }) }) diff --git a/test/multi.spec.js b/test/multi.spec.js index 610e7b8b59..b3a0422d1d 100644 --- a/test/multi.spec.js +++ b/test/multi.spec.js @@ -194,7 +194,7 @@ describe('The \'multi\' method', () => { }) describe('when connection is broken', () => { - it.skip('return an error even if connection is in broken mode', (done) => { + it('return an error even if connection is in broken mode', (done) => { client = redis.createClient({ host: 'somewhere', port: 6379, @@ -202,13 +202,12 @@ describe('The \'multi\' method', () => { }) client.on('error', (err) => { - if (/Redis connection in broken state/.test(err.message)) { - done() - } + assert.strictEqual(err.code, 'NR_CLOSED') + done() }) client.multi([['set', 'foo', 'bar'], ['get', 'foo']]).exec().catch((err) => { - // assert(/Redis connection in broken state/.test(err.message)); + assert(/Stream connection ended and command aborted/.test(err.message)) assert.strictEqual(err.errors.length, 2) assert.strictEqual(err.errors[0].args.length, 2) }) diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index 471edd6884..6885383099 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -507,8 +507,7 @@ describe('The nodeRedis client', () => { }, 50) }) - // TODO: Fix this by adding the CONNECTION_BROKEN back in - it.skip('enqueues operation and keep the queue while trying to reconnect', (done) => { + it('enqueues operation and keep the queue while trying to reconnect', (done) => { client = redis.createClient(9999, null, { retryStrategy (options) { if (options.attempt < 4) { @@ -519,7 +518,6 @@ describe('The nodeRedis client', () => { let i = 0 client.on('error', (err) => { - console.log(err) if (err.code === 'NR_CLOSED') { assert(i, 3) assert.strictEqual(client.offlineQueue.length, 0)