'use strict' const Queue = require('denque') const EventEmitter = require('events') const net = require('net') const { Command } = require('./command') const connect = require('./connect') const debug = require('./debug') const flushAndError = require('./flushAndError') const Multi = require('./multi') const offlineCommand = require('./offlineCommand') const utils = require('./utils') const normalizeAndWriteCommand = require('./writeCommand') const noop = function () {} let connectionId = 0 // Attention: The second parameter might be removed at will and is not officially supported. // Do not rely on this class RedisClient extends EventEmitter { /** * Creates an instance of RedisClient. * @param {object} options * * @memberof RedisClient */ constructor(options) { var i super() // Copy the options so they are not mutated options = utils.clone(options) // TODO: Add a more restrictive options validation const cnxOptions = {} if (options.tls) { const tlsKeys = Object.keys(options.tls) for (i = 0; i < tlsKeys.length; i++) { const tlsOption = tlsKeys[i] cnxOptions[tlsOption] = options.tls[tlsOption] // Copy the tls options into the general options to make sure the address is set right if (tlsOption === 'port' || tlsOption === 'host' || tlsOption === 'path' || tlsOption === 'family') { options[tlsOption] = options.tls[tlsOption] } } } if (options.stream) { // The stream from the outside is used so no connection from this side is // triggered but from the server this client should talk to Reconnect etc // won't work with this. This requires monkey patching to work, so it is // not officially supported this.address = '"Private stream"' } else if (options.path) { cnxOptions.path = options.path this.address = options.path } else { cnxOptions.port = +options.port || 6379 cnxOptions.host = options.host || '127.0.0.1' cnxOptions.family = (!options.family && net.isIP(cnxOptions.host)) || (options.host && options.family === 'IPv6' ? 6 : 4) this.address = `${cnxOptions.host}:${cnxOptions.port}` } // TODO: Properly fix typo if (options.socketKeepalive === undefined) { options.socketKeepalive = true } if (options.renameCommands) { const renameKeys = Object.keys(options.renameCommands) for (i = 0; i < renameKeys.length; i++) { const command = renameKeys[i] options.renameCommands[command.toLowerCase()] = options.renameCommands[command] } } if (typeof options.enableOfflineQueue !== 'boolean') { if (options.enableOfflineQueue !== undefined) { throw new TypeError('enableOfflineQueue must be a boolean') } options.enableOfflineQueue = true } // Override the detectBuffers setting if returnBuffers is active and print a warning if (options.returnBuffers && options.detectBuffers) { process.nextTick( utils.warn, this, 'WARNING: You activated returnBuffers and detectBuffers at the same time. The return value is always going to be a buffer.' ) options.detectBuffers = false } if (options.authPass) { if (options.password) { throw new TypeError('The "password" and "authPass" option may not both be set at the same time.') } options.password = options.authPass } options.returnBuffers = !!options.returnBuffers options.detectBuffers = !!options.detectBuffers // Public Variables this.connected = false this.shouldBuffer = false this.commandQueue = new Queue() // Holds sent commands this.offlineQueue = new Queue() // Holds commands issued but not able to be sent yet this.serverInfo = {} this.connectionId = connectionId++ this.selectedDb = options.db // Save the selected db here, used when reconnecting // Private Variables // Pipelining this._pipeline = false this._strCache = '' this._pipelineQueue = new Queue() // Pub sub mode this._subCommandsLeft = 0 this._pubSubMode = 0 this._subscriptionSet = {} this._subscribeChannels = [] this._messageBuffers = false // Others this._multi = false this._monitoring = false this._parserReturningBuffers = options.returnBuffers || options.detectBuffers this._options = options this._reply = 'ON' this._retryStrategyProvided = !!options.retryStrategy this._closing = false this._timesConnected = 0 this._connectionOptions = cnxOptions // Only used as timeout until redis has to be connected to redis until // throwing an connection error this._connectTimeout = +options.connectTimeout || 60 * 1000 // ms this._retryStrategy = options.retryStrategy || function (options) { // TODO: Find better defaults if (options.attempt > 100) { return } // reconnect after return Math.min(options.attempt * 100, 3000) } utils.setReconnectDefaults(this) // Init parser and connect connect(this) this.on('newListener', function (event) { if ((event === 'messageBuffer' || event === 'pmessageBuffer') && this._messageBuffers === false) { this._messageBuffers = true if (this._parserReturningBuffers === false) { this._parserReturningBuffers = true this._replyParser.setReturnBuffers(true) } } }) } // Do not call internalSendCommand directly, if you are not absolutely certain // it handles everything properly e.g. monitor / info does not work with // internalSendCommand only // // TODO: Move this function out of the client as a private function // // TODO: Check how others can intercept (monkey patch) essential parts (e.g. // opbeat) after making this private. internalSendCommand(commandObj) { if (this.ready === false || this._stream.writable === false) { // Handle offline commands right away offlineCommand(this, commandObj) return commandObj.promise } normalizeAndWriteCommand(this, commandObj) if (commandObj.callOnWrite) { commandObj.callOnWrite() } // Handle `CLIENT REPLY ON|OFF|SKIP` // This has to be checked after callOnWrite if (this._reply === 'ON') { this.commandQueue.push(commandObj) } else { // Do not expect a reply // Does this work in combination with the pub sub mode? utils.replyInOrder(this, commandObj.callback, null, undefined, this.commandQueue) if (this._reply === 'SKIP') { this._reply = 'SKIP_ONE_MORE' } else if (this._reply === 'SKIP_ONE_MORE') { this._reply = 'ON' } } return commandObj.promise } // Redirect calls to the appropriate function and use to send arbitrary / not supported commands sendCommand(command, args) { // Throw to fail early instead of relying in order in this case if (typeof command !== 'string') { throw new TypeError(`Wrong input type "${command !== null && command !== undefined ? command.constructor.name : command}" for command name`) } if (!Array.isArray(args)) { if (args === undefined || args === null) { args = [] } else { throw new TypeError(`Wrong input type "${args.constructor.name}" for args`) } } // Using the raw multi command is only possible with this function If the // command is not yet added to the client, the internal function should be // called right away Otherwise we need to redirect the calls to make sure // the internal functions don't get skipped The internal functions could // actually be used for any non hooked function but this might change from // time to time and at the moment there's no good way to distinguish them // from each other, so let's just do it do it this way for the time being if (command === 'multi' || typeof this[command] !== 'function') { return this.internalSendCommand(new Command(command, args)) } return this[command].apply(this, args) } end(flush) { if (typeof flush !== 'boolean') { throw new TypeError('You must call "end" with the flush argument.') } // Flush queue if wanted if (flush) { flushAndError(this, 'Connection forcefully ended and command aborted.', 'NR_CLOSED') } // Clear retryTimer if (this.retryTimer) { clearTimeout(this.retryTimer) this.retryTimer = null } this._stream.removeAllListeners() this._stream.on('error', noop) this.connected = false this.ready = false this._closing = true return this._stream.destroySoon() } unref() { if (this.connected) { debug('Unref\'ing the socket connection') this._stream.unref() } else { debug('Not connected yet, will unref later') this.once('connect', () => this.unref()) } } // TODO: promisify this // This can not be done without removing support to return the client sync. // This would be another BC and it should be fine to return the client sync. // Therefore a option could be to accept a resolved promise instead of a callback // to return a promise. duplicate(options, callback) { if (typeof options === 'function') { callback = options options = null } const existingOptions = utils.clone(this._options) options = utils.clone(options) const keys = Object.keys(options) for (var i = 0; i < keys.length; i++) { existingOptions[keys[i]] = options[keys[i]] } const client = new RedisClient(existingOptions) // Return to the same state as the other client // by also selecting the db / returning to pub sub // mode or into monitor mode. client.selectedDb = this.selectedDb client._subscriptionSet = this._subscriptionSet client._monitoring = this._monitoring if (typeof callback === 'function') { const errorListener = (err) => { callback(err) client.end(true) } const readyListener = () => { callback(null, client) client.removeAllListeners(errorListener) } client.once('ready', readyListener) client.once('error', errorListener) return client } return client } // Note: this overrides a native function! multi(args) { return new Multi(this, 'multi', args) } batch(args) { return new Multi(this, 'batch', args) } } module.exports = RedisClient