diff --git a/index.js b/index.js index fca8a92494..22dd4e5de8 100644 --- a/index.js +++ b/index.js @@ -78,8 +78,6 @@ class RedisClient extends EventEmitter { options.renameCommands[command.toLowerCase()] = options.renameCommands[command] } } - options.returnBuffers = !!options.returnBuffers - options.detectBuffers = !!options.detectBuffers if (typeof options.enableOfflineQueue !== 'boolean') { if (options.enableOfflineQueue !== undefined) { throw new TypeError('enableOfflineQueue must be a boolean') @@ -109,6 +107,8 @@ class RedisClient extends EventEmitter { } options.password = options.authPass } + options.returnBuffers = !!options.returnBuffers + options.detectBuffers = !!options.detectBuffers this.selectedDb = options.db // Save the selected db here, used when reconnecting this._strCache = '' this._pipeline = false @@ -131,9 +131,12 @@ class RedisClient extends EventEmitter { // Init parser and connect connect(this) this.on('newListener', function (event) { - if ((event === 'messageBuffer' || event === 'pmessageBuffer') && !this.buffers && !this.messageBuffers) { - this.messageBuffers = true - this._replyParser.setReturnBuffers(true) + if ((event === 'messageBuffer' || event === 'pmessageBuffer') && this._messageBuffers === false) { + this._messageBuffers = true + if (this._parserReturningBuffers === false) { + this._parserReturningBuffers = true + this._replyParser.setReturnBuffers(true) + } } }) } diff --git a/lib/connect.js b/lib/connect.js index bd698e7711..c3f100a322 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -76,14 +76,15 @@ function createParser (client) { function connect (client) { // Init parser const parser = createParser(client) + const options = client._options client._replyParser = parser - if (client._options.stream) { + if (options.stream) { // Only add the listeners once in case of a reconnect try (that won't work) if (client._stream) { return } - client._stream = client._options.stream + client._stream = options.stream } else { // On a reconnect destroy the former stream and retry if (client._stream) { @@ -91,19 +92,8 @@ function connect (client) { client._stream.destroy() } - if (client._options.tls) { + if (options.tls) { client._stream = tls.connect(client._connectionOptions) - - // Whenever a handshake times out. - // Older Node.js versions use "clientError", newer versions use tlsClientError. - stream.once('clientError', (err) => { - debug('clientError occurred') - onStreamError(client, err) - }) - stream.once('tlsClientError', (err) => { - debug('clientError occurred') - onStreamError(client, err) - }) } else { client._stream = net.createConnection(client._connectionOptions) } @@ -111,18 +101,18 @@ function connect (client) { const stream = client._stream - if (client._options.connectTimeout) { + if (options.connectTimeout) { // TODO: Investigate why this is not properly triggered - stream.setTimeout(client._options.connectTimeout, () => { + stream.setTimeout(client._connectTimeout, () => { // Note: This is only tested if a internet connection is established lazyReconnect(client, 'timeout') }) } - const connectEvent = client._options.tls ? 'secureConnect' : 'connect' + const connectEvent = options.tls ? 'secureConnect' : 'connect' stream.once(connectEvent, () => { stream.removeAllListeners('timeout') - client.timesConnected++ + client._timesConnected++ onConnect(client) }) @@ -143,14 +133,27 @@ function connect (client) { lazyReconnect(client, 'end') }) + if (options.tls) { + // Whenever a handshake times out. + // Older Node.js versions use "clientError", newer versions use tlsClientError. + stream.once('clientError', (err) => { + debug('clientError occurred') + onStreamError(client, err) + }) + stream.once('tlsClientError', (err) => { + debug('clientError occurred') + onStreamError(client, err) + }) + } + stream.setNoDelay() // Fire the command before redis is connected to be sure it's the first fired command. // TODO: Consider calling the ready check before Redis is connected as well. // That could improve the ready performance. Measure the rough time difference! - if (client._options.password !== undefined) { + if (options.password !== undefined) { client.ready = true - client.auth(client._options.password).catch((err) => { + client.auth(options.password).catch((err) => { client._closing = true process.nextTick(() => { client.emit('error', err) diff --git a/lib/individualCommands.js b/lib/individualCommands.js index f773fa1353..885219b2a6 100644 --- a/lib/individualCommands.js +++ b/lib/individualCommands.js @@ -70,7 +70,7 @@ Multi.prototype.monitor = function monitor () { function quitCallback (client) { return function (err, res) { if (client._stream.writable) { - // If the socket is still alive, kill it. This could happen if quit got a NR_CLOSED error code + // If the socket is still alive, destroy it. This could happen if quit got a NR_CLOSED error code client._stream.destroy() } if (err && err.code === 'NR_CLOSED') { diff --git a/lib/reconnect.js b/lib/reconnect.js index 0e69a23b76..485bf72e2d 100644 --- a/lib/reconnect.js +++ b/lib/reconnect.js @@ -51,16 +51,16 @@ function reconnect (client, why, error) { client.emit('end') - if (why === 'timeout') { - var message = 'Redis connection in broken state: connection timeout exceeded.' - const err = new Errors.RedisError(message) - // TODO: Find better error codes... - err.code = 'CONNECTION_BROKEN' - flushAndError(client, message, 'CONNECTION_BROKEN') - client.emit('error', err) - client.end(false) - return - } + // if (why === 'timeout') { + // var message = 'Redis connection in broken state: connection timeout exceeded.' + // const err = new Errors.RedisError(message) + // // TODO: Find better error codes... + // err.code = 'CONNECTION_BROKEN' + // flushAndError(client, message, 'CONNECTION_BROKEN') + // client.emit('error', err) + // client.end(false) + // return + // } // If client is a requested shutdown, then don't retry if (client._closing) { diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index 36e19be5de..471edd6884 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -520,7 +520,7 @@ describe('The nodeRedis client', () => { client.on('error', (err) => { console.log(err) - if (err.code === 'CONNECTION_BROKEN') { + if (err.code === 'NR_CLOSED') { assert(i, 3) assert.strictEqual(client.offlineQueue.length, 0) assert.strictEqual(err.origin.code, 'ECONNREFUSED') diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 2dbd1b1f11..1bfd4bede9 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -458,21 +458,25 @@ describe('publish/subscribe', () => { }) it('allows to listen to pmessageBuffer and pmessage', (done) => { - const end = helper.callFuncAfter(done, 4) + const end = helper.callFuncAfter(done, 5) const data = Array(10000).join('äüs^öéÉÉ`e') sub.set('foo', data).then(() => { sub.get('foo').then((res) => assert.strictEqual(typeof res, 'string')) sub._stream.once('data', () => { - assert.strictEqual(sub.messageBuffers, false) + assert.strictEqual(sub._messageBuffers, false) assert.strictEqual(sub.shouldBuffer, false) + assert.strictEqual(args[2].detectBuffers, sub._parserReturningBuffers) + assert.strictEqual(sub._messageBuffers, false) sub.on('messageBuffer', (channel, message, pattern) => { if (pattern) { assert.strictEqual(pattern.inspect(), Buffer.from('*').inspect()) + end() } assert.strictEqual(channel.inspect(), Buffer.from('/foo').inspect()) end() }) - assert.notStrictEqual(sub.messageBuffers, sub.buffers) + assert.strictEqual(sub._messageBuffers, sub._parserReturningBuffers) + assert.strictEqual(sub._messageBuffers, true) }) const batch = sub.batch() batch.psubscribe('*')