From 3065e2e7be962a7c1a7cf8e41c555c7faf7d52c6 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Fri, 26 May 2017 18:45:52 +0200 Subject: [PATCH] chore: refactor parts out of the index.js file --- index.js | 189 +++++++--------------------------- lib/commands.js | 28 ++--- lib/connect.js | 131 +++++++++++++++++++++++ lib/createClient.js | 2 +- lib/extendedApi.js | 17 ++- lib/individualCommands.js | 21 ++-- lib/utils.js | 17 ++- test/auth.spec.js | 4 +- test/commands/blpop.spec.js | 2 +- test/commands/info.spec.js | 2 +- test/commands/monitor.spec.js | 8 +- test/commands/select.spec.js | 2 +- test/connection.spec.js | 45 +++----- test/helper.js | 2 +- test/lib/good-traces.js | 2 +- test/multi.spec.js | 4 +- test/node_redis.spec.js | 60 ++++------- test/pubsub.spec.js | 14 +-- test/tls.spec.js | 2 +- 19 files changed, 271 insertions(+), 281 deletions(-) create mode 100644 lib/connect.js diff --git a/index.js b/index.js index adc165ea13..77c3aa7d27 100644 --- a/index.js +++ b/index.js @@ -8,17 +8,19 @@ // TODO: Rewrite this to classes const Buffer = require('buffer').Buffer const net = require('net') -const tls = require('tls') const util = require('util') const utils = require('./lib/utils') const Command = require('./lib/command') const Queue = require('denque') const errorClasses = require('./lib/customErrors') const EventEmitter = require('events') -const Parser = require('redis-parser') const Errors = require('redis-errors') const debug = require('./lib/debug') +const connect = require('./lib/connect') +const Commands = require('redis-commands') +const addCommand = require('./lib/commands') const unifyOptions = require('./lib/createClient') +const Multi = require('./lib/multi') const normalizeAndWriteCommand = require('./lib/writeCommands') const SUBSCRIBE_COMMANDS = { subscribe: true, @@ -42,8 +44,6 @@ function handleDetectBuffersReply (reply, command, bufferArgs) { return reply } -exports.debugMode = /\bredis\b/i.test(process.env.NODE_DEBUG) - // Attention: The second parameter might be removed at will and is not officially supported. // Do not rely on this function RedisClient (options, stream) { @@ -88,7 +88,9 @@ function RedisClient (options, stream) { options.detectBuffers = !!options.detectBuffers // Override the detectBuffers setting if returnBuffers is active and print a warning if (options.returnBuffers && options.detectBuffers) { - this.warn('WARNING: You activated returnBuffers and detectBuffers at the same time. The return value is always going to be a buffer.') + process.nextTick(() => + utils.warn(this, 'WARNING: You activated returnBuffers and detectBuffers at the same time. The return value is always going to be a buffer.') + ) options.detectBuffers = false } if (options.detectBuffers) { @@ -130,14 +132,13 @@ function RedisClient (options, stream) { } this.retryStrategyProvided = !!options.retryStrategy this.subscribeChannels = [] - // Init parser - this.replyParser = createParser(this) - this.createStream() + // Init parser and connect + connect(this) this.on('newListener', function (event) { if ((event === 'messageBuffer' || event === 'pmessageBuffer') && !this.buffers && !this.messageBuffers) { this.messageBuffers = true this.handleReply = handleDetectBuffersReply - this.replyParser.setReturnBuffers(true) + this._replyParser.setReturnBuffers(true) } }) } @@ -145,35 +146,6 @@ util.inherits(RedisClient, EventEmitter) RedisClient.connectionId = 0 -function createParser (self) { - return new Parser({ - returnReply (data) { - self.returnReply(data) - }, - returnError (err) { - // Return a ReplyError to indicate Redis returned an error - self.returnError(err) - }, - returnFatalError (err) { - // Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again - // Note: the execution order is important. First flush and emit, then create the stream - err.message += '. Please report this.' - self.ready = false - self.flushAndError({ - message: 'Fatal error encountered. Command aborted.', - code: 'NR_FATAL' - }, { - error: err, - queues: ['commandQueue'] - }) - self.createStream() - setImmediate(() => self.emit('error', err)) - }, - returnBuffers: self.buffers || self.messageBuffers, - stringNumbers: self.options.stringNumbers || false - }) -} - /****************************************************************************** All functions in here are internal besides the RedisClient constructor @@ -182,88 +154,6 @@ function createParser (self) { ******************************************************************************/ -// Attention: the function name "createStream" should not be changed, as other libraries need this to mock the stream (e.g. fakeredis) -RedisClient.prototype.createStream = function () { - // Init parser - this.replyParser = createParser(this) - - if (this.options.stream) { - // Only add the listeners once in case of a reconnect try (that won't work) - if (this.stream) { - return - } - this.stream = this.options.stream - } else { - // On a reconnect destroy the former stream and retry - if (this.stream) { - this.stream.removeAllListeners() - this.stream.destroy() - } - - /* istanbul ignore if: travis does not work with stunnel atm. Therefore the tls tests are skipped on travis */ - if (this.options.tls) { - this.stream = tls.connect(this.connectionOptions) - } else { - this.stream = net.createConnection(this.connectionOptions) - } - } - - if (this.options.connectTimeout) { - // TODO: Investigate why this is not properly triggered - this.stream.setTimeout(this.connectTimeout, () => { - // Note: This is only tested if a internet connection is established - this.connectionGone('timeout') - }) - } - - /* istanbul ignore next: travis does not work with stunnel atm. Therefore the tls tests are skipped on travis */ - const connectEvent = this.options.tls ? 'secureConnect' : 'connect' - this.stream.once(connectEvent, () => { - this.stream.removeAllListeners('timeout') - this.timesConnected++ - this.onConnect() - }) - - this.stream.on('data', (bufferFromSocket) => { - // The bufferFromSocket.toString() has a significant impact on big chunks and therefore this should only be used if necessary - debug('Net read %s id %s', this.address, this.connectionId) // + ': ' + bufferFromSocket.toString()); - this.replyParser.execute(bufferFromSocket) - }) - - this.stream.on('error', (err) => { - this.onError(err) - }) - - /* istanbul ignore next: difficult to test and not important as long as we keep this listener */ - this.stream.on('clientError', (err) => { - debug('clientError occurred') - this.onError(err) - }) - - this.stream.once('close', (hadError) => { - this.connectionGone('close') - }) - - this.stream.once('end', () => { - this.connectionGone('end') - }) - - this.stream.setNoDelay() - - // Fire the command before redis is connected to be sure it's the first fired command - if (this.authPass !== undefined) { - this.ready = true - this.auth(this.authPass).catch((err) => { - this.closing = true - process.nextTick(() => { - this.emit('error', err) - this.end(true) - }) - }) - this.ready = false - } -} - RedisClient.prototype.handleReply = function (reply, command) { if (command === 'hgetall') { reply = utils.replyToObject(reply) @@ -281,18 +171,6 @@ RedisClient.prototype.initializeRetryVars = function () { this.attempts = 1 } -RedisClient.prototype.warn = function (msg) { - // Warn on the next tick. Otherwise no event listener can be added - // for warnings that are emitted in the redis client constructor - process.nextTick(() => { - if (this.listeners('warning').length !== 0) { - this.emit('warning', msg) - } else { - console.warn('nodeRedis:', msg) - } - }) -} - // Flush provided queues, erroring any items with a callback first RedisClient.prototype.flushAndError = function (errorAttributes, options) { options = options || {} @@ -347,8 +225,8 @@ RedisClient.prototype.onConnect = function () { this.connected = true this.ready = false this.emittedEnd = false - this.stream.setKeepAlive(this.options.socketKeepalive) - this.stream.setTimeout(0) + this._stream.setKeepAlive(this.options.socketKeepalive) + this._stream.setTimeout(0) this.emit('connect') this.initializeRetryVars() @@ -366,7 +244,7 @@ RedisClient.prototype.onReady = function () { this.cork = () => { this.pipeline = true - this.stream.cork() + this._stream.cork() } this.uncork = () => { if (this.fireStrings) { @@ -377,7 +255,7 @@ RedisClient.prototype.onReady = function () { this.pipeline = false this.fireStrings = true // TODO: Consider using next tick here. See https://github.com/NodeRedis/nodeRedis/issues/1033 - this.stream.uncork() + this._stream.uncork() } // Restore modal commands from previous connection. The order of the commands is important @@ -498,7 +376,7 @@ const retryConnection = function (self, error) { self.retryTotaltime += self.retryDelay self.attempts += 1 - self.createStream() + connect(self) self.retryTimer = null } @@ -737,7 +615,7 @@ function handleOfflineCommand (self, commandObj) { if (self.closing || !self.enableOfflineQueue) { command = command.toUpperCase() if (!self.closing) { - if (self.stream.writable) { + if (self._stream.writable) { msg = 'The connection is not yet established and the offline queue is deactivated.' } else { msg = 'Stream not writeable.' @@ -764,7 +642,7 @@ function handleOfflineCommand (self, commandObj) { // Do not call internalSendCommand directly, if you are not absolutely certain it handles everything properly // e.g. monitor / info does not work with internalSendCommand only RedisClient.prototype.internalSendCommand = function (commandObj) { - if (this.ready === false || this.stream.writable === false) { + if (this.ready === false || this._stream.writable === false) { // Handle offline commands right away handleOfflineCommand(this, commandObj) return commandObj.promise @@ -798,19 +676,19 @@ RedisClient.prototype.writeStrings = function () { for (var command = this.pipelineQueue.shift(); command; command = this.pipelineQueue.shift()) { // Write to stream if the string is bigger than 4mb. The biggest string may be Math.pow(2, 28) - 15 chars long if (str.length + command.length > 4 * 1024 * 1024) { - this.shouldBuffer = !this.stream.write(str) + this.shouldBuffer = !this._stream.write(str) str = '' } str += command } if (str !== '') { - this.shouldBuffer = !this.stream.write(str) + this.shouldBuffer = !this._stream.write(str) } } RedisClient.prototype.writeBuffers = function () { for (var command = this.pipelineQueue.shift(); command; command = this.pipelineQueue.shift()) { - this.shouldBuffer = !this.stream.write(command) + this.shouldBuffer = !this._stream.write(command) } } @@ -820,24 +698,29 @@ RedisClient.prototype.writeBuffers = function () { // This can only be used for strings only though. RedisClient.prototype.write = function (data) { if (this.pipeline === false) { - this.shouldBuffer = !this.stream.write(data) + this.shouldBuffer = !this._stream.write(data) return } this.pipelineQueue.push(data) } -exports.createClient = function () { - return new RedisClient(unifyOptions.apply(null, arguments)) +Commands.list.forEach((name) => addCommand(RedisClient.prototype, Multi.prototype, name)) + +module.exports = { + debugMode: /\bredis\b/i.test(process.env.NODE_DEBUG), + RedisClient, + Multi, + AbortError: errorClasses.AbortError, + ParserError: Errors.ParserError, + RedisError: Errors.RedisError, + ReplyError: Errors.ReplyError, + InterruptError: Errors.InterruptError, + createClient () { + return new RedisClient(unifyOptions.apply(null, arguments)) + } } -exports.RedisClient = RedisClient -exports.Multi = require('./lib/multi') -exports.AbortError = errorClasses.AbortError -exports.RedisError = Errors.RedisError -exports.ParserError = Errors.ParserError -exports.ReplyError = Errors.ReplyError -exports.InterruptError = Errors.InterruptError // Add all redis commands / nodeRedis api to the client +// TODO: Change the way this is included... require('./lib/individualCommands') require('./lib/extendedApi') -require('./lib/commands') diff --git a/lib/commands.js b/lib/commands.js index d67da61fed..b6ecb68d7b 100644 --- a/lib/commands.js +++ b/lib/commands.js @@ -1,23 +1,15 @@ 'use strict' -const commands = require('redis-commands') -const Multi = require('./multi') -const RedisClient = require('../').RedisClient const Command = require('./command') -const clientProto = RedisClient.prototype -const multiProto = Multi.prototype - -// TODO: Rewrite this including the individual commands into a Commands class -// that provided a functionality to add new commands to the client -commands.list.forEach((command) => { +function addCommand (clientProto, multiProto, command) { // Some rare Redis commands use special characters in their command name // Convert those to a underscore to prevent using invalid function names const commandName = command.replace(/(?:^([0-9])|[^a-zA-Z0-9_$])/g, '_$1') // Do not override existing functions if (!clientProto[command]) { - clientProto[command] = function () { + clientProto[commandName] = function () { const len = arguments.length const arr = new Array(len) for (var i = 0; i < len; i += 1) { @@ -25,16 +17,16 @@ commands.list.forEach((command) => { } return this.internalSendCommand(new Command(command, arr)) } - if (clientProto[command] !== commandName) { - Object.defineProperty(clientProto[command], 'name', { + if (!clientProto[commandName].name) { + Object.defineProperty(clientProto[commandName], 'name', { value: commandName }) } } // Do not override existing functions - if (!multiProto[command]) { - multiProto[command] = function () { + if (!multiProto[command] && command !== 'multi') { + multiProto[commandName] = function () { const len = arguments.length const arr = new Array(len) for (var i = 0; i < len; i += 1) { @@ -43,10 +35,12 @@ commands.list.forEach((command) => { this._queue.push(new Command(command, arr)) return this } - if (multiProto[command] !== commandName) { - Object.defineProperty(multiProto[command], 'name', { + if (!multiProto[commandName].name) { + Object.defineProperty(multiProto[commandName], 'name', { value: commandName }) } } -}) +} + +module.exports = addCommand diff --git a/lib/connect.js b/lib/connect.js new file mode 100644 index 0000000000..b54d60fd2d --- /dev/null +++ b/lib/connect.js @@ -0,0 +1,131 @@ +'use strict' + +const tls = require('tls') +const Parser = require('redis-parser') +const net = require('net') +const debug = require('./debug') + +/** + * @description Create a new Parser instance and pass all the necessary options to it + * + * @param {RedisClient} client + * @returns JavascriptRedisParser + */ +function createParser (client) { + return new Parser({ + returnReply (data) { + client.returnReply(data) + }, + returnError (err) { + client.returnError(err) + }, + returnFatalError (err) { + // Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again + // Note: the execution order is important. First flush and emit, then create the stream + err.message += '. Please report this.' + client.ready = false + client.flushAndError({ + message: 'Fatal error encountered. Command aborted.', + code: 'NR_FATAL' + }, { + error: err, + queues: ['commandQueue'] + }) + connect(client) + setImmediate(() => client.emit('error', err)) + }, + returnBuffers: client.buffers || client.messageBuffers, + stringNumbers: client.options.stringNumbers || false + }) +} + +// TODO: Open a PR for fakeredis to pass a mocked stream with the options +/** + * @description Connect to the provided client and add all the listeners. + * + * It will also init a parser and fire the auth command if a password exists. + * + * @param {RedisClient} client + */ +function connect (client) { + // Init parser + const parser = createParser(client) + client._replyParser = parser + + if (client.options.stream) { + // Only add the listeners once in case of a reconnect try (that won't work) + if (client._stream) { + return + } + client._stream = client.options.stream + } else { + // On a reconnect destroy the former stream and retry + if (client._stream) { + client._stream.removeAllListeners() + client._stream.destroy() + } + + /* istanbul ignore if: travis does not work with stunnel atm. Therefore the tls tests are skipped on travis */ + if (client.options.tls) { + client._stream = tls.connect(client.connectionOptions) + } else { + client._stream = net.createConnection(client.connectionOptions) + } + } + + if (client.options.connectTimeout) { + // TODO: Investigate why this is not properly triggered + client._stream.setTimeout(client.connectTimeout, () => { + // Note: This is only tested if a internet connection is established + client.connectionGone('timeout') + }) + } + + /* istanbul ignore next: travis does not work with stunnel atm. Therefore the tls tests are skipped on travis */ + const connectEvent = client.options.tls ? 'secureConnect' : 'connect' + client._stream.once(connectEvent, () => { + client._stream.removeAllListeners('timeout') + client.timesConnected++ + client.onConnect() + }) + + client._stream.on('data', (bufferFromSocket) => { + debug('Net read %s id %s', client.address, client.connectionId) + parser.execute(bufferFromSocket) + }) + + client._stream.on('error', (err) => { + client.onError(err) + }) + + /* istanbul ignore next: difficult to test and not important as long as we keep this listener */ + client._stream.on('clientError', (err) => { + debug('clientError occurred') + client.onError(err) + }) + + client._stream.once('close', (hadError) => { + client.connectionGone('close') + }) + + client._stream.once('end', () => { + client.connectionGone('end') + }) + + client._stream.setNoDelay() + + // Fire the command before redis is connected to be sure it's the first fired command + if (client.authPass !== undefined) { + client.ready = true + client.auth(client.authPass).catch((err) => { + client.closing = true + process.nextTick(() => { + client.emit('error', err) + client.end(true) + }) + }) + client.ready = false + } +} + +module.exports = connect diff --git a/lib/createClient.js b/lib/createClient.js index b6e8ca165d..d0c5d4d7d8 100644 --- a/lib/createClient.js +++ b/lib/createClient.js @@ -28,7 +28,7 @@ module.exports = function createClient (portArg, hostArg, options) { options.password = parsed.auth.split(':')[1] } if (parsed.protocol && parsed.protocol !== 'redis:') { - console.warn(`nodeRedis: WARNING: You passed "${parsed.protocol.substring(0, parsed.protocol.length - 1) }" as protocol instead of the "redis" protocol!`) + console.warn(`nodeRedis: WARNING: You passed "${parsed.protocol.substring(0, parsed.protocol.length - 1)}" as protocol instead of the "redis" protocol!`) } if (parsed.pathname && parsed.pathname !== '/') { options.db = parsed.pathname.substr(1) diff --git a/lib/extendedApi.js b/lib/extendedApi.js index 5bf7075299..dfa1764cb4 100644 --- a/lib/extendedApi.js +++ b/lib/extendedApi.js @@ -40,35 +40,34 @@ RedisClient.prototype.sendCommand = function (command, args) { } RedisClient.prototype.end = function (flush) { + if (typeof flush !== 'boolean') { + throw new TypeError('You must call "end" with the flush argument.') + } + // Flush queue if wanted if (flush) { this.flushAndError({ message: 'Connection forcefully ended and command aborted.', code: 'NR_CLOSED' }) - } else if (arguments.length === 0) { - this.warn( - 'Using .end() without the flush parameter is deprecated and throws from v.3.0.0 on.\n' + - 'Please check the documentation (https://github.com/NodeRedis/nodeRedis) and explicitly use flush.' - ) } // Clear retryTimer if (this.retryTimer) { clearTimeout(this.retryTimer) this.retryTimer = null } - this.stream.removeAllListeners() - this.stream.on('error', noop) + this._stream.removeAllListeners() + this._stream.on('error', noop) this.connected = false this.ready = false this.closing = true - return this.stream.destroySoon() + return this._stream.destroySoon() } RedisClient.prototype.unref = function () { if (this.connected) { debug('Unref\'ing the socket connection') - this.stream.unref() + this._stream.unref() } else { debug('Not connected yet, will unref later') this.once('connect', function () { diff --git a/lib/individualCommands.js b/lib/individualCommands.js index 2d96f3fa57..809cd4ee0c 100644 --- a/lib/individualCommands.js +++ b/lib/individualCommands.js @@ -3,6 +3,7 @@ const debug = require('./debug') const Multi = require('./multi') const Command = require('./command') +const utils = require('./utils') const noPasswordIsSet = /no password is set/ const RedisClient = require('../').RedisClient @@ -21,11 +22,11 @@ const RedisClient = require('../').RedisClient TODO: Implement hooks to replace this. Most of these things are perfect for hooks ********************************************************************************************/ -function selectCallback (self, db) { +function selectCallback (client, db) { return function (err, res) { if (err === null) { // Store db in this.selectDb to restore it on reconnect - self.selectedDb = db + client.selectedDb = db } return err || res } @@ -66,11 +67,11 @@ Multi.prototype.monitor = function monitor () { return this } -function quitCallback (self) { +function quitCallback (client) { return function (err, res) { - if (self.stream.writable) { + if (client._stream.writable) { // If the socket is still alive, kill it. This could happen if quit got a NR_CLOSED error code - self.stream.destroy() + client._stream.destroy() } if (err && err.code === 'NR_CLOSED') { // Pretend the quit command worked properly in this case. @@ -106,10 +107,10 @@ Multi.prototype.quit = function quit () { return this } -function infoCallback (self) { +function infoCallback (client) { return function (err, res) { if (err) { - self.serverInfo = {} + client.serverInfo = {} return err } @@ -139,7 +140,7 @@ function infoCallback (self) { }) } // Expose info key/values to users - self.serverInfo = obj + client.serverInfo = obj return res } } @@ -156,11 +157,11 @@ Multi.prototype.info = function info (section) { return this } -function authCallback (self, pass) { +function authCallback (client, pass) { return function (err, res) { if (err) { if (noPasswordIsSet.test(err.message)) { - self.warn('Warning: Redis server does not require a password, but a password was supplied.') + utils.warn(client, 'Warning: Redis server does not require a password, but a password was supplied.') return 'OK' // TODO: Fix this } return err diff --git a/lib/utils.js b/lib/utils.js index 7cc83b2cec..85742f6c52 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -115,11 +115,26 @@ function replyInOrder (client, callback, err, res, queue) { } } +/** + * @description Emit or print a warning. E.g. deprecations + * + * @param {RedisClient} client + * @param {string} msg + */ +function warn (client, msg) { + if (client.listeners('warning').length !== 0) { + client.emit('warning', msg) + } else { + console.warn('NodeRedis:', msg) + } +} + module.exports = { replyToStrings, replyToObject, errCode: /^([A-Z]+)\s+(.+)$/, monitorRegex: /^[0-9]{10,11}\.[0-9]+ \[[0-9]+ .+]( ".+?")+$/, clone: convenienceClone, - replyInOrder + replyInOrder, + warn } diff --git a/test/auth.spec.js b/test/auth.spec.js index fc5b6fbf19..4b58dad949 100644 --- a/test/auth.spec.js +++ b/test/auth.spec.js @@ -134,7 +134,7 @@ if (process.platform !== 'win32') { } clearInterval(interval) interval = null - client.stream.destroy() + client._stream.destroy() client.set('foo', 'bar').catch(done) client.get('foo').catch(done) assert.strictEqual(client.offlineQueue.length, 2) @@ -220,7 +220,7 @@ if (process.platform !== 'win32') { // Coherent behavior with all other offline commands fires commands before emitting but does not wait till they return assert.strictEqual(client.pubSubMode, 2) client.ping().then(() => { // Make sure all commands were properly processed already - client.stream.destroy() + client._stream.destroy() }) }) }) diff --git a/test/commands/blpop.spec.js b/test/commands/blpop.spec.js index 542818911e..cfd75db617 100644 --- a/test/commands/blpop.spec.js +++ b/test/commands/blpop.spec.js @@ -8,7 +8,7 @@ const intercept = require('intercept-stdout') describe('The \'blpop\' method', () => { helper.allTests((ip, args) => { - describe(`using ${ip}`, () => { + describe.only(`using ${ip}`, () => { let client let bclient diff --git a/test/commands/info.spec.js b/test/commands/info.spec.js index ac0740a71a..096ff7d2b0 100644 --- a/test/commands/info.spec.js +++ b/test/commands/info.spec.js @@ -52,7 +52,7 @@ describe('The \'info\' method', () => { assert.strictEqual(err.code, 'UNCERTAIN_STATE') assert.strictEqual(err.command, 'INFO') }) - client.stream.destroy() + client._stream.destroy() return promise }) }) diff --git a/test/commands/monitor.spec.js b/test/commands/monitor.spec.js index 6c71458a96..1ef792f127 100644 --- a/test/commands/monitor.spec.js +++ b/test/commands/monitor.spec.js @@ -107,7 +107,7 @@ describe('The \'monitor\' method', () => { // End after a reconnect return done() } - client.stream.destroy() + client._stream.destroy() client.mget('hello', 'world') called = true }) @@ -127,7 +127,7 @@ describe('The \'monitor\' method', () => { // End after a reconnect return done() } - client.stream.destroy() + client._stream.destroy() client.mget('hello', 'world') called = true }) @@ -138,7 +138,7 @@ describe('The \'monitor\' method', () => { assert.strictEqual(err.code, 'UNCERTAIN_STATE') }) client.on('error', () => {}) // Ignore error here - client.stream.destroy() + client._stream.destroy() const end = helper.callFuncAfter(done, 2) client.on('monitor', (time, args, rawOutput) => { assert.strictEqual(client.monitoring, true) @@ -171,7 +171,7 @@ describe('The \'monitor\' method', () => { client.subscribe('/foo', '/bar') client.unsubscribe('/bar') setTimeout(() => { - client.stream.destroy() + client._stream.destroy() client.once('ready', () => { pub.publish('/foo', 'hello world') }) diff --git a/test/commands/select.spec.js b/test/commands/select.spec.js index 9ee1f95425..da71c15963 100644 --- a/test/commands/select.spec.js +++ b/test/commands/select.spec.js @@ -64,7 +64,7 @@ describe('The \'select\' method', () => { it('selects the appropriate database after a reconnect', (done) => { assert.strictEqual(client.selectedDb, undefined, 'default db should be undefined') client.select(3) - client.set('foo', 'bar').then(() => client.stream.destroy()) + client.set('foo', 'bar').then(() => client._stream.destroy()) client.once('ready', () => { assert.strictEqual(client.selectedDb, 3) assert(typeof client.serverInfo.db3 === 'object') diff --git a/test/connection.spec.js b/test/connection.spec.js index 9b21ebc38e..fc42963461 100644 --- a/test/connection.spec.js +++ b/test/connection.spec.js @@ -2,6 +2,7 @@ const assert = require('assert') const config = require('./lib/config') +const connect = require('../lib/connect') const helper = require('./helper') const redis = config.redis const intercept = require('intercept-stdout') @@ -16,7 +17,7 @@ describe('connection tests', () => { client.end(true) }) - it('unofficially support for a private stream', () => { + it('support for a private stream', () => { // While using a private stream, reconnecting and other features are not going to work properly. // Besides that some functions also have to be monkey patched to be safe from errors in this case. // Therefore this is not officially supported! @@ -24,13 +25,13 @@ describe('connection tests', () => { client = new redis.RedisClient({ prefix: 'test' }, socket) - assert.strictEqual(client.stream, socket) - assert.strictEqual(client.stream.listeners('error').length, 1) + assert.strictEqual(client._stream, socket) + assert.strictEqual(client._stream.listeners('error').length, 1) assert.strictEqual(client.address, '"Private stream"') // Pretend a reconnect event - client.createStream() - assert.strictEqual(client.stream, socket) - assert.strictEqual(client.stream.listeners('error').length, 1) + connect(client) + assert.strictEqual(client._stream, socket) + assert.strictEqual(client._stream.listeners('error').length, 1) }) describe('quit on lost connections', () => { @@ -127,7 +128,7 @@ describe('connection tests', () => { client.set('foo', 'bar').catch(helper.isError()) client.quit().then(() => done()) process.nextTick(() => { - client.stream.destroy() + client._stream.destroy() }) }) }) @@ -228,7 +229,7 @@ describe('connection tests', () => { } }) setTimeout(() => { - client.stream.destroy() + client._stream.destroy() }, 50) }) }) @@ -245,7 +246,7 @@ describe('connection tests', () => { return 5000 } }) - process.nextTick(() => assert.strictEqual(client.stream.listeners('timeout').length, 1)) + process.nextTick(() => assert.strictEqual(client._stream.listeners('timeout').length, 1)) assert.strictEqual(client.address, '10.255.255.1:6379') assert.strictEqual(client.connectionOptions.family, 4) @@ -277,7 +278,7 @@ describe('connection tests', () => { assert.strictEqual(client.address, '2001:db8::ff00:42:8329:6379') assert.strictEqual(client.connectionOptions.family, 6) process.nextTick(() => { - assert.strictEqual(client.stream.listeners('timeout').length, 0) + assert.strictEqual(client._stream.listeners('timeout').length, 0) done() }) client.end(true) @@ -288,11 +289,11 @@ describe('connection tests', () => { connectTimeout: 1000 }) process.nextTick(() => { - assert.strictEqual(client.stream._idleTimeout, 1000) + assert.strictEqual(client._stream._idleTimeout, 1000) }) client.on('connect', () => { - assert.strictEqual(client.stream._idleTimeout, -1) - assert.strictEqual(client.stream.listeners('timeout').length, 0) + assert.strictEqual(client._stream._idleTimeout, -1) + assert.strictEqual(client._stream.listeners('timeout').length, 0) client.on('ready', done) }) }) @@ -415,24 +416,6 @@ describe('connection tests', () => { }) }) - it('fake the stream to mock redis', () => { - // This is needed for libraries that want to mock the stream like fakeredis - const temp = redis.RedisClient.prototype.createStream - const createStreamString = String(temp) - redis.RedisClient.prototype.createStream = function () { - this.connected = true - this.ready = true - } - client = new redis.RedisClient() - assert.strictEqual(client.stream, undefined) - assert.strictEqual(client.ready, true) - assert.strictEqual(client.connected, true) - client.end = function () {} - assert(createStreamString !== String(redis.RedisClient.prototype.createStream)) - redis.RedisClient.prototype.createStream = temp - assert(createStreamString === String(redis.RedisClient.prototype.createStream)) - }) - if (ip === 'IPv4') { it('allows connecting with the redis url to the default host and port, select db 3 and warn about duplicate db option', (done) => { client = redis.createClient('redis:///3?db=3') diff --git a/test/helper.js b/test/helper.js index 3165e96bf5..330da521f7 100644 --- a/test/helper.js +++ b/test/helper.js @@ -207,6 +207,6 @@ module.exports = { family: 4 } client.address = '127.0.0.1:65535' - client.stream.destroy() + client._stream.destroy() } } diff --git a/test/lib/good-traces.js b/test/lib/good-traces.js index 348b83c23d..832f5a85bd 100644 --- a/test/lib/good-traces.js +++ b/test/lib/good-traces.js @@ -15,6 +15,6 @@ client.set('foo').catch((err) => { }) }) process.nextTick(() => { - client.stream.destroy() + client._stream.destroy() }) }) diff --git a/test/multi.spec.js b/test/multi.spec.js index 6ae31deea9..d9cc2b91ad 100644 --- a/test/multi.spec.js +++ b/test/multi.spec.js @@ -173,7 +173,7 @@ describe('The \'multi\' method', () => { it('executes a pipelined multi properly after a reconnect in combination with the offline queue', (done) => { client.once('ready', () => { - client.stream.destroy() + client._stream.destroy() let called = false const multi1 = client.multi() multi1.set('m1', '123') @@ -472,7 +472,7 @@ describe('The \'multi\' method', () => { }) it('works properly after a reconnect. issue #897', (done) => { - client.stream.destroy() + client._stream.destroy() client.on('error', (err) => { assert.strictEqual(err.code, 'ECONNREFUSED') }) diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index 539eea43ad..517cb60ab4 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -21,8 +21,8 @@ describe('The nodeRedis client', () => { // Check that every entry RedisClient entry has a correspondent Multi entry assert.strictEqual(clientPrototype.filter((entry) => { return !multiPrototype.includes(entry.replace('RedisClient', 'Multi')) - }).length, 2) // multi and batch are included too - assert.strictEqual(clientPrototype.length, multiPrototype.length + 2) + }).length, 0) + assert.strictEqual(clientPrototype.length, multiPrototype.length) // Check that all entries exist only in lowercase variants assert.strictEqual(data.match(/(\n| = )RedisClient\.prototype.[a-z][a-zA-Z_]+/g).length, clientPrototype.length) done() @@ -46,14 +46,14 @@ describe('The nodeRedis client', () => { }) client.once('reconnecting', () => { process.nextTick(() => { - assert.strictEqual(client.replyParser.buffer, null) + assert.strictEqual(client._replyParser.buffer, null) done() }) }) const partialInput = Buffer.from('$100\r\nabcdef') - client.replyParser.execute(partialInput) - assert.strictEqual(client.replyParser.buffer.inspect(), partialInput.inspect()) - client.stream.destroy() + client._replyParser.execute(partialInput) + assert.strictEqual(client._replyParser.buffer.inspect(), partialInput.inspect()) + client._stream.destroy() }) helper.allTests((ip, args) => { @@ -260,7 +260,7 @@ describe('The nodeRedis client', () => { }) bclient.once('ready', () => { setTimeout(() => { - bclient.stream.destroy() + bclient._stream.destroy() client.rpush('blocking list 2', 'initial value').then(helper.isNumber(1)) }, 100) }) @@ -280,7 +280,7 @@ describe('The nodeRedis client', () => { done() }) setTimeout(() => { - bclient.stream.destroy() + bclient._stream.destroy() client.rpush('blocking list 2', 'initial value').then(helper.isNumber(1)) }, 100) }) @@ -288,29 +288,13 @@ describe('The nodeRedis client', () => { }) describe('.end', () => { - it('used without flush / flush set to false', (done) => { - let finished = false - const end = helper.callFuncAfter(() => { - if (!finished) { - done(new Error('failed')) - } - }, 20) - const cb = function (err) { - assert(/Connection forcefully ended|The connection is already closed./.test(err.message)) - assert.strictEqual(err.code, 'NR_CLOSED') - end() + it('used without flush / flush set to false', () => { + try { + client.end() + throw new Error('failed') + } catch (e) { + assert(e instanceof TypeError) } - for (let i = 0; i < 20; i++) { - if (i === 10) { - client.end() - } - client.set('foo', 'bar').then(assert, cb) - } - client.on('warning', () => {}) // Ignore deprecation message - setTimeout(() => { - finished = true - done() - }, 25) }) it('used with flush set to true', (done) => { @@ -322,7 +306,7 @@ describe('The nodeRedis client', () => { for (let i = 0; i < 20; i++) { if (i === 10) { client.end(true) - client.stream.write('foo') // Trigger an error on the closed stream that we ignore + client._stream.write('foo') // Trigger an error on the closed stream that we ignore } client.set('foo', 'bar').then(assert, cb) } @@ -364,7 +348,7 @@ describe('The nodeRedis client', () => { client.set('recon 2', 'two').then((res) => { // Do not do this in normal programs. This is to simulate the server closing on us. // For orderly shutdown in normal programs, do client.quit() - client.stream.destroy() + client._stream.destroy() }) }) @@ -385,7 +369,7 @@ describe('The nodeRedis client', () => { client.set('recon 2', 'two').then((res) => { // Do not do this in normal programs. This is to simulate the server closing on us. // For orderly shutdown in normal programs, do client.quit() - client.stream.destroy() + client._stream.destroy() }) }) }) @@ -408,7 +392,7 @@ describe('The nodeRedis client', () => { client.subscribe('recon channel').then((res) => { // Do not do this in normal programs. This is to simulate the server closing on us. // For orderly shutdown in normal programs, do client.quit() - client.stream.destroy() + client._stream.destroy() }) }) @@ -428,7 +412,7 @@ describe('The nodeRedis client', () => { client.subscribe('recon channel').then((res) => { // Do not do this in normal programs. This is to simulate the server closing on us. // For orderly shutdown in normal programs, do client.quit() - client.stream.destroy() + client._stream.destroy() }) }) }) @@ -510,7 +494,7 @@ describe('The nodeRedis client', () => { client.get('foo').then(helper.isString('bar')).then(done) }) client.once('ready', () => { - client.set('foo', 'bar').then(assert, (err) => { + client.set('foo', 'bar').then(helper.fail, (err) => { assert.strictEqual(err.message, 'Fatal error encountered. Command aborted. It might have been processed.') assert.strictEqual(err.code, 'NR_FATAL') assert(err instanceof redis.AbortError) @@ -520,7 +504,7 @@ describe('The nodeRedis client', () => { // ready is called in a reply process.nextTick(() => { // Fail the set answer. Has no corresponding command obj and will therefore land in the error handler and set - client.replyParser.execute(Buffer.from('a*1\r*1\r$1`zasd\r\na')) + client._replyParser.execute(Buffer.from('a*1\r*1\r$1`zasd\r\na')) }) }) }) @@ -632,7 +616,7 @@ describe('The nodeRedis client', () => { enableOfflineQueue: false }) client.on('ready', () => { - client.stream.destroy() + client._stream.destroy() client.set('foo', 'bar').then(assert, (err) => { assert.strictEqual(err.message, 'SET can\'t be processed. Stream not writeable.') done() diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 255c4ccdf2..8d48776580 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -41,7 +41,7 @@ describe('publish/subscribe', () => { return done(new Error('Test failed')) } assert.strictEqual(2, count) - sub.stream.destroy() + sub._stream.destroy() } }) @@ -95,7 +95,7 @@ describe('publish/subscribe', () => { if (chnl === channel2) { assert.strictEqual(2, count) if (a) return done() - sub.stream.destroy() + sub._stream.destroy() } }) @@ -118,7 +118,7 @@ describe('publish/subscribe', () => { if (a) { return done() } - sub.stream.destroy() + sub._stream.destroy() } }) @@ -205,7 +205,7 @@ describe('publish/subscribe', () => { assert.strictEqual(channel, channels[n]) assert.strictEqual(message, msg[n]) if (count === 2) return done() - sub.stream.end() + sub._stream.end() }) sub.select(3) @@ -229,7 +229,7 @@ describe('publish/subscribe', () => { sub.mget('foo', 'bar', 'baz', 'hello', 'world').then(helper.isDeepEqual(['bar', null, null, null, null])) sub.subscribe('somechannel', 'another channel').then((res) => { end() - sub.stream.destroy() + sub._stream.destroy() }) assert(sub.ready) sub.on('ready', () => { @@ -273,7 +273,7 @@ describe('publish/subscribe', () => { sub.batch().unsubscribe(['/bar']).exec().then(() => { pub.pubsub('channels').then((res) => { helper.isDeepEqual(['/foo'])(res) - sub.stream.destroy() + sub._stream.destroy() sub.once('ready', () => { pub.pubsub('channels').then((res) => { helper.isDeepEqual(['/foo'])(res) @@ -459,7 +459,7 @@ describe('publish/subscribe', () => { const data = Array(10000).join('äüs^öéÉÉ`e') sub.set('foo', data).then(() => { sub.get('foo') - sub.stream.once('data', () => { + sub._stream.once('data', () => { assert.strictEqual(sub.messageBuffers, false) assert.strictEqual(sub.shouldBuffer, false) sub.on('pmessageBuffer', (pattern, channel, message) => { diff --git a/test/tls.spec.js b/test/tls.spec.js index 1ca30c333f..f4f202bf96 100644 --- a/test/tls.spec.js +++ b/test/tls.spec.js @@ -100,7 +100,7 @@ describe('TLS connection tests', () => { assert.strictEqual(client.connectionOptions.host, 'localhost') assert.strictEqual(client.connectionOptions.port, tlsPort) assert.strictEqual(client.address, `localhost:${tlsPort}`) - assert(client.stream.encrypted) + assert(client._stream.encrypted) client.set('foo', 'bar') return client.get('foo').then(helper.isString('bar'))