1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-01 16:46:54 +03:00
Files
node-redis/lib/client.js
Ruben Bridgewater d7c31da598 chore: add callback functionality back in
This also improves the performance for multi / batch commands a lot.
The reason is that now there are only callbacks internally even if
a promise is going to be returned in the end.
2017-11-29 19:16:40 -02:00

296 lines
10 KiB
JavaScript

'use strict'
const Queue = require('denque')
const EventEmitter = require('events')
const net = require('net')
const { Command } = require('./command')
const connect = require('./connect')
const debug = require('./debug')
const flushAndError = require('./flushAndError')
const Multi = require('./multi')
const offlineCommand = require('./offlineCommand')
const utils = require('./utils')
const normalizeAndWriteCommand = require('./writeCommand')
const noop = function () {}
let connectionId = 0
// Attention: The second parameter might be removed at will and is not officially supported.
// Do not rely on this
class RedisClient extends EventEmitter {
/**
* Creates an instance of RedisClient.
* @param {object} options
*
* @memberof RedisClient
*/
constructor(options) {
var i
super()
// Copy the options so they are not mutated
options = utils.clone(options)
// TODO: Add a more restrictive options validation
const cnxOptions = {}
if (options.tls) {
const tlsKeys = Object.keys(options.tls)
for (i = 0; i < tlsKeys.length; i++) {
const tlsOption = tlsKeys[i]
cnxOptions[tlsOption] = options.tls[tlsOption]
// Copy the tls options into the general options to make sure the address is set right
if (tlsOption === 'port' || tlsOption === 'host' || tlsOption === 'path' || tlsOption === 'family') {
options[tlsOption] = options.tls[tlsOption]
}
}
}
if (options.stream) {
// The stream from the outside is used so no connection from this side is
// triggered but from the server this client should talk to Reconnect etc
// won't work with this. This requires monkey patching to work, so it is
// not officially supported
this.address = '"Private stream"'
} else if (options.path) {
cnxOptions.path = options.path
this.address = options.path
} else {
cnxOptions.port = +options.port || 6379
cnxOptions.host = options.host || '127.0.0.1'
cnxOptions.family = (!options.family && net.isIP(cnxOptions.host)) || (options.host && options.family === 'IPv6' ? 6 : 4)
this.address = `${cnxOptions.host}:${cnxOptions.port}`
}
// TODO: Properly fix typo
if (options.socketKeepalive === undefined) {
options.socketKeepalive = true
}
if (options.renameCommands) {
const renameKeys = Object.keys(options.renameCommands)
for (i = 0; i < renameKeys.length; i++) {
const command = renameKeys[i]
options.renameCommands[command.toLowerCase()] = options.renameCommands[command]
}
}
if (typeof options.enableOfflineQueue !== 'boolean') {
if (options.enableOfflineQueue !== undefined) {
throw new TypeError('enableOfflineQueue must be a boolean')
}
options.enableOfflineQueue = true
}
// Override the detectBuffers setting if returnBuffers is active and print a warning
if (options.returnBuffers && options.detectBuffers) {
process.nextTick(
utils.warn,
this,
'WARNING: You activated returnBuffers and detectBuffers at the same time. The return value is always going to be a buffer.'
)
options.detectBuffers = false
}
if (options.authPass) {
if (options.password) {
throw new TypeError('The "password" and "authPass" option may not both be set at the same time.')
}
options.password = options.authPass
}
options.returnBuffers = !!options.returnBuffers
options.detectBuffers = !!options.detectBuffers
// Public Variables
this.connected = false
this.shouldBuffer = false
this.commandQueue = new Queue() // Holds sent commands
this.offlineQueue = new Queue() // Holds commands issued but not able to be sent yet
this.serverInfo = {}
this.connectionId = connectionId++
this.selectedDb = options.db // Save the selected db here, used when reconnecting
// Private Variables
// Pipelining
this._pipeline = false
this._strCache = ''
this._pipelineQueue = new Queue()
// Pub sub mode
this._subCommandsLeft = 0
this._pubSubMode = 0
this._subscriptionSet = {}
this._subscribeChannels = []
this._messageBuffers = false
// Others
this._multi = false
this._monitoring = false
this._parserReturningBuffers = options.returnBuffers || options.detectBuffers
this._options = options
this._reply = 'ON'
this._retryStrategyProvided = !!options.retryStrategy
this._closing = false
this._timesConnected = 0
this._connectionOptions = cnxOptions
// Only used as timeout until redis has to be connected to redis until
// throwing an connection error
this._connectTimeout = +options.connectTimeout || 60 * 1000 // ms
this._retryStrategy = options.retryStrategy || function (options) {
// TODO: Find better defaults
if (options.attempt > 100) {
return
}
// reconnect after
return Math.min(options.attempt * 100, 3000)
}
utils.setReconnectDefaults(this)
// Init parser and connect
connect(this)
this.on('newListener', function (event) {
if ((event === 'messageBuffer' || event === 'pmessageBuffer') && this._messageBuffers === false) {
this._messageBuffers = true
if (this._parserReturningBuffers === false) {
this._parserReturningBuffers = true
this._replyParser.setReturnBuffers(true)
}
}
})
}
// Do not call internalSendCommand directly, if you are not absolutely certain
// it handles everything properly e.g. monitor / info does not work with
// internalSendCommand only
//
// TODO: Move this function out of the client as a private function
//
// TODO: Check how others can intercept (monkey patch) essential parts (e.g.
// opbeat) after making this private.
internalSendCommand(commandObj) {
if (this.ready === false || this._stream.writable === false) {
// Handle offline commands right away
offlineCommand(this, commandObj)
return commandObj.promise
}
normalizeAndWriteCommand(this, commandObj)
if (commandObj.callOnWrite) {
commandObj.callOnWrite()
}
// Handle `CLIENT REPLY ON|OFF|SKIP`
// This has to be checked after callOnWrite
if (this._reply === 'ON') {
this.commandQueue.push(commandObj)
} else {
// Do not expect a reply
// Does this work in combination with the pub sub mode?
utils.replyInOrder(this, commandObj.callback, null, undefined, this.commandQueue)
if (this._reply === 'SKIP') {
this._reply = 'SKIP_ONE_MORE'
} else if (this._reply === 'SKIP_ONE_MORE') {
this._reply = 'ON'
}
}
return commandObj.promise
}
// Redirect calls to the appropriate function and use to send arbitrary / not supported commands
sendCommand(command, args) {
// Throw to fail early instead of relying in order in this case
if (typeof command !== 'string') {
throw new TypeError(`Wrong input type "${command !== null && command !== undefined ? command.constructor.name : command}" for command name`)
}
if (!Array.isArray(args)) {
if (args === undefined || args === null) {
args = []
} else {
throw new TypeError(`Wrong input type "${args.constructor.name}" for args`)
}
}
// Using the raw multi command is only possible with this function If the
// command is not yet added to the client, the internal function should be
// called right away Otherwise we need to redirect the calls to make sure
// the internal functions don't get skipped The internal functions could
// actually be used for any non hooked function but this might change from
// time to time and at the moment there's no good way to distinguish them
// from each other, so let's just do it do it this way for the time being
if (command === 'multi' || typeof this[command] !== 'function') {
return this.internalSendCommand(new Command(command, args))
}
return this[command].apply(this, args)
}
end(flush) {
if (typeof flush !== 'boolean') {
throw new TypeError('You must call "end" with the flush argument.')
}
// Flush queue if wanted
if (flush) {
flushAndError(this, 'Connection forcefully ended and command aborted.', 'NR_CLOSED')
}
// Clear retryTimer
if (this.retryTimer) {
clearTimeout(this.retryTimer)
this.retryTimer = null
}
this._stream.removeAllListeners()
this._stream.on('error', noop)
this.connected = false
this.ready = false
this._closing = true
return this._stream.destroySoon()
}
unref() {
if (this.connected) {
debug('Unref\'ing the socket connection')
this._stream.unref()
} else {
debug('Not connected yet, will unref later')
this.once('connect', () => this.unref())
}
}
// TODO: promisify this
// This can not be done without removing support to return the client sync.
// This would be another BC and it should be fine to return the client sync.
// Therefore a option could be to accept a resolved promise instead of a callback
// to return a promise.
duplicate(options, callback) {
if (typeof options === 'function') {
callback = options
options = null
}
const existingOptions = utils.clone(this._options)
options = utils.clone(options)
const keys = Object.keys(options)
for (var i = 0; i < keys.length; i++) {
existingOptions[keys[i]] = options[keys[i]]
}
const client = new RedisClient(existingOptions)
// Return to the same state as the other client
// by also selecting the db / returning to pub sub
// mode or into monitor mode.
client.selectedDb = this.selectedDb
client._subscriptionSet = this._subscriptionSet
client._monitoring = this._monitoring
if (typeof callback === 'function') {
const errorListener = (err) => {
callback(err)
client.end(true)
}
const readyListener = () => {
callback(null, client)
client.removeAllListeners(errorListener)
}
client.once('ready', readyListener)
client.once('error', errorListener)
return client
}
return client
}
// Note: this overrides a native function!
multi(args) {
return new Multi(this, 'multi', args)
}
batch(args) {
return new Multi(this, 'batch', args)
}
}
module.exports = RedisClient