1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

chore: refactor client to es6 class and sort requires

This commit is contained in:
Ruben Bridgewater
2017-05-28 05:43:34 +02:00
parent 579e080ad5
commit 5fef7e246d
12 changed files with 253 additions and 253 deletions

359
index.js
View File

@@ -4,155 +4,267 @@
// We have to replace the error codes and make them coherent. // We have to replace the error codes and make them coherent.
// We also have to use InterruptError s instead of AbortError s. // We also have to use InterruptError s instead of AbortError s.
// The Error messages might be improved as well. // The Error messages might be improved as well.
// TODO: Rewrite this to classes
const net = require('net')
const util = require('util')
const utils = require('./lib/utils')
const Queue = require('denque') const Queue = require('denque')
const EventEmitter = require('events') const EventEmitter = require('events')
const Errors = require('redis-errors') const net = require('net')
const connect = require('./lib/connect')
const Commands = require('redis-commands') const Commands = require('redis-commands')
const Errors = require('redis-errors')
const Command = require('./lib/command')
const addCommand = require('./lib/commands') const addCommand = require('./lib/commands')
const connect = require('./lib/connect')
const unifyOptions = require('./lib/createClient') const unifyOptions = require('./lib/createClient')
const debug = require('./lib/debug')
const flushAndError = require('./lib/flushAndError')
const Multi = require('./lib/multi') const Multi = require('./lib/multi')
const normalizeAndWriteCommand = require('./lib/writeCommands')
const offlineCommand = require('./lib/offlineCommand') const offlineCommand = require('./lib/offlineCommand')
const utils = require('./lib/utils')
const normalizeAndWriteCommand = require('./lib/writeCommands')
const noop = function () {}
// Attention: The second parameter might be removed at will and is not officially supported. // Attention: The second parameter might be removed at will and is not officially supported.
// Do not rely on this // Do not rely on this
function RedisClient (options, stream) { class RedisClient extends EventEmitter {
// Copy the options so they are not mutated /**
options = utils.clone(options) * Creates an instance of RedisClient.
EventEmitter.call(this) * @param {object} options
const cnxOptions = {} * @param {any} [stream]
/* istanbul ignore next: travis does not work with stunnel atm. Therefore the tls tests are skipped on travis */ *
for (const tlsOption in options.tls) { * @memberof RedisClient
if (options.tls.hasOwnProperty(tlsOption)) { */
cnxOptions[tlsOption] = options.tls[tlsOption] constructor (options, stream) {
// Copy the tls options into the general options to make sure the address is set right super()
if (tlsOption === 'port' || tlsOption === 'host' || tlsOption === 'path' || tlsOption === 'family') { // Copy the options so they are not mutated
options[tlsOption] = options.tls[tlsOption] options = utils.clone(options)
const cnxOptions = {}
/* istanbul ignore next: travis does not work with stunnel atm. Therefore the tls tests are skipped on travis */
for (const tlsOption in options.tls) {
if (options.tls.hasOwnProperty(tlsOption)) {
cnxOptions[tlsOption] = options.tls[tlsOption]
// Copy the tls options into the general options to make sure the address is set right
if (tlsOption === 'port' || tlsOption === 'host' || tlsOption === 'path' || tlsOption === 'family') {
options[tlsOption] = options.tls[tlsOption]
}
} }
} }
} if (stream) {
if (stream) { // The stream from the outside is used so no connection from this side is triggered but from the server this client should talk to
// The stream from the outside is used so no connection from this side is triggered but from the server this client should talk to // Reconnect etc won't work with this. This requires monkey patching to work, so it is not officially supported
// Reconnect etc won't work with this. This requires monkey patching to work, so it is not officially supported options.stream = stream
options.stream = stream this.address = '"Private stream"'
this.address = '"Private stream"' } else if (options.path) {
} else if (options.path) { cnxOptions.path = options.path
cnxOptions.path = options.path this.address = options.path
this.address = options.path } else {
} else { cnxOptions.port = +options.port || 6379
cnxOptions.port = +options.port || 6379 cnxOptions.host = options.host || '127.0.0.1'
cnxOptions.host = options.host || '127.0.0.1' cnxOptions.family = (!options.family && net.isIP(cnxOptions.host)) || (options.family === 'IPv6' ? 6 : 4)
cnxOptions.family = (!options.family && net.isIP(cnxOptions.host)) || (options.family === 'IPv6' ? 6 : 4) this.address = `${cnxOptions.host}:${cnxOptions.port}`
this.address = `${cnxOptions.host}:${cnxOptions.port}` }
this.connectionOptions = cnxOptions
this.connectionId = RedisClient.connectionId++
this.connected = false
if (options.socketKeepalive === undefined) {
options.socketKeepalive = true
}
for (const command in options.renameCommands) {
if (options.renameCommands.hasOwnProperty(command)) {
options.renameCommands[command.toLowerCase()] = options.renameCommands[command]
}
}
options.returnBuffers = !!options.returnBuffers
options.detectBuffers = !!options.detectBuffers
// Override the detectBuffers setting if returnBuffers is active and print a warning
if (options.returnBuffers && options.detectBuffers) {
process.nextTick(
utils.warn,
this,
'WARNING: You activated returnBuffers and detectBuffers at the same time. The return value is always going to be a buffer.'
)
options.detectBuffers = false
}
this.shouldBuffer = false
this.commandQueue = new Queue() // Holds sent commands to de-pipeline them
this.offlineQueue = new Queue() // Holds commands issued but not able to be sent
this._pipelineQueue = new Queue() // Holds all pipelined commands
// Only used as timeout until redis has to be connected to redis until throwing an connection error
this.connectTimeout = +options.connectTimeout || 60000 // 60 * 1000 ms
this.enableOfflineQueue = options.enableOfflineQueue !== false
this.pubSubMode = 0
this.subscriptionSet = {}
this.monitoring = false
this.messageBuffers = false
this.closing = false
this.serverInfo = {}
this.authPass = options.authPass || options.password
this.selectedDb = options.db // Save the selected db here, used when reconnecting
this.oldState = null
this._strCache = ''
this._pipeline = false
this.subCommandsLeft = 0
this.renameCommands = options.renameCommands || {}
this.timesConnected = 0
this.buffers = options.returnBuffers || options.detectBuffers
this.options = options
this._multi = false
this.reply = 'ON' // Returning replies is the default
this.retryStrategy = options.retryStrategy || function (options) {
if (options.attempt > 100) {
return
}
// reconnect after
return Math.min(options.attempt * 100, 3000)
}
this.retryStrategyProvided = !!options.retryStrategy
this.subscribeChannels = []
utils.setReconnectDefaults(this)
// Init parser and connect
connect(this)
this.on('newListener', function (event) {
if ((event === 'messageBuffer' || event === 'pmessageBuffer') && !this.buffers && !this.messageBuffers) {
this.messageBuffers = true
this._replyParser.setReturnBuffers(true)
}
})
} }
this.connectionOptions = cnxOptions // Do not call internalSendCommand directly, if you are not absolutely certain it handles everything properly
this.connectionId = RedisClient.connectionId++ // e.g. monitor / info does not work with internalSendCommand only
this.connected = false internalSendCommand (commandObj) {
if (options.socketKeepalive === undefined) { if (this.ready === false || this._stream.writable === false) {
options.socketKeepalive = true // Handle offline commands right away
} offlineCommand(this, commandObj)
for (const command in options.renameCommands) { return commandObj.promise
if (options.renameCommands.hasOwnProperty(command)) {
options.renameCommands[command.toLowerCase()] = options.renameCommands[command]
} }
}
options.returnBuffers = !!options.returnBuffers
options.detectBuffers = !!options.detectBuffers
// Override the detectBuffers setting if returnBuffers is active and print a warning
if (options.returnBuffers && options.detectBuffers) {
process.nextTick(
utils.warn,
this,
'WARNING: You activated returnBuffers and detectBuffers at the same time. The return value is always going to be a buffer.'
)
options.detectBuffers = false
}
this.shouldBuffer = false
this.commandQueue = new Queue() // Holds sent commands to de-pipeline them
this.offlineQueue = new Queue() // Holds commands issued but not able to be sent
this._pipelineQueue = new Queue() // Holds all pipelined commands
// Only used as timeout until redis has to be connected to redis until throwing an connection error
this.connectTimeout = +options.connectTimeout || 60000 // 60 * 1000 ms
this.enableOfflineQueue = options.enableOfflineQueue !== false
this.pubSubMode = 0
this.subscriptionSet = {}
this.monitoring = false
this.messageBuffers = false
this.closing = false
this.serverInfo = {}
this.authPass = options.authPass || options.password
this.selectedDb = options.db // Save the selected db here, used when reconnecting
this.oldState = null
this._strCache = ''
this._pipeline = false
this.subCommandsLeft = 0
this.renameCommands = options.renameCommands || {}
this.timesConnected = 0
this.buffers = options.returnBuffers || options.detectBuffers
this.options = options
this._multi = false
this.reply = 'ON' // Returning replies is the default
this.retryStrategy = options.retryStrategy || function (options) {
if (options.attempt > 100) {
return
}
// reconnect after
return Math.min(options.attempt * 100, 3000)
}
this.retryStrategyProvided = !!options.retryStrategy
this.subscribeChannels = []
utils.setReconnectDefaults(this)
// Init parser and connect
connect(this)
this.on('newListener', function (event) {
if ((event === 'messageBuffer' || event === 'pmessageBuffer') && !this.buffers && !this.messageBuffers) {
this.messageBuffers = true
this._replyParser.setReturnBuffers(true)
}
})
}
util.inherits(RedisClient, EventEmitter)
RedisClient.connectionId = 0 normalizeAndWriteCommand(this, commandObj)
// Do not call internalSendCommand directly, if you are not absolutely certain it handles everything properly if (commandObj.callOnWrite) {
// e.g. monitor / info does not work with internalSendCommand only commandObj.callOnWrite()
RedisClient.prototype.internalSendCommand = function (commandObj) { }
if (this.ready === false || this._stream.writable === false) { // Handle `CLIENT REPLY ON|OFF|SKIP`
// Handle offline commands right away // This has to be checked after callOnWrite
offlineCommand(this, commandObj) /* istanbul ignore else: TODO: Remove this as soon as we test Redis 3.2 on travis */
if (this.reply === 'ON') {
this.commandQueue.push(commandObj)
} else {
// Do not expect a reply
// Does this work in combination with the pub sub mode?
utils.replyInOrder(this, commandObj.callback, null, undefined, this.commandQueue)
if (this.reply === 'SKIP') {
this.reply = 'SKIP_ONE_MORE'
} else if (this.reply === 'SKIP_ONE_MORE') {
this.reply = 'ON'
}
}
return commandObj.promise return commandObj.promise
} }
normalizeAndWriteCommand(this, commandObj) // Redirect calls to the appropriate function and use to send arbitrary / not supported commands
sendCommand (command, args) {
// Throw to fail early instead of relying in order in this case
if (typeof command !== 'string') {
throw new TypeError(`Wrong input type "${command !== null && command !== undefined ? command.constructor.name : command}" for command name`)
}
if (!Array.isArray(args)) {
if (args === undefined || args === null) {
args = []
} else {
throw new TypeError(`Wrong input type "${args.constructor.name}" for args`)
}
}
if (commandObj.callOnWrite) { // Using the raw multi command is only possible with this function
commandObj.callOnWrite() // If the command is not yet added to the client, the internal function should be called right away
// Otherwise we need to redirect the calls to make sure the internal functions don't get skipped
// The internal functions could actually be used for any non hooked function
// but this might change from time to time and at the moment there's no good way to distinguish them
// from each other, so let's just do it do it this way for the time being
if (command === 'multi' || typeof this[command] !== 'function') {
return this.internalSendCommand(new Command(command, args))
}
return this[command].apply(this, args)
} }
// Handle `CLIENT REPLY ON|OFF|SKIP`
// This has to be checked after callOnWrite end (flush) {
/* istanbul ignore else: TODO: Remove this as soon as we test Redis 3.2 on travis */ if (typeof flush !== 'boolean') {
if (this.reply === 'ON') { throw new TypeError('You must call "end" with the flush argument.')
this.commandQueue.push(commandObj) }
} else {
// Do not expect a reply // Flush queue if wanted
// Does this work in combination with the pub sub mode? if (flush) {
utils.replyInOrder(this, commandObj.callback, null, undefined, this.commandQueue) flushAndError(this, 'Connection forcefully ended and command aborted.', 'NR_CLOSED')
if (this.reply === 'SKIP') { }
this.reply = 'SKIP_ONE_MORE' // Clear retryTimer
} else if (this.reply === 'SKIP_ONE_MORE') { if (this.retryTimer) {
this.reply = 'ON' clearTimeout(this.retryTimer)
this.retryTimer = null
}
this._stream.removeAllListeners()
this._stream.on('error', noop)
this.connected = false
this.ready = false
this.closing = true
return this._stream.destroySoon()
}
unref () {
if (this.connected) {
debug('Unref\'ing the socket connection')
this._stream.unref()
} else {
debug('Not connected yet, will unref later')
this.once('connect', function () {
this.unref()
})
} }
} }
return commandObj.promise
// TODO: promisify this
duplicate (options, callback) {
if (typeof options === 'function') {
callback = options
options = null
}
const existingOptions = utils.clone(this.options)
options = utils.clone(options)
for (const elem in options) {
if (options.hasOwnProperty(elem)) {
existingOptions[elem] = options[elem]
}
}
const client = new RedisClient(existingOptions)
client.selectedDb = this.selectedDb
if (typeof callback === 'function') {
const errorListener = function (err) {
callback(err)
client.end(true)
}
const readyListener = function () {
callback(null, client)
client.removeAllListeners(errorListener)
}
client.once('ready', readyListener)
client.once('error', errorListener)
return client
}
return client
}
// Note: this overrides a native function!
multi (args) {
return new Multi(this, 'multi', args)
}
// Note: This is not a native function but is still handled as a individual command as it behaves just the same as multi
batch (args) {
return new Multi(this, 'batch', args)
}
} }
RedisClient.connectionId = 0
Commands.list.forEach((name) => addCommand(RedisClient.prototype, Multi.prototype, name)) Commands.list.forEach((name) => addCommand(RedisClient.prototype, Multi.prototype, name))
module.exports = { module.exports = {
@@ -172,4 +284,3 @@ module.exports = {
// Add all redis commands / nodeRedis api to the client // Add all redis commands / nodeRedis api to the client
// TODO: Change the way this is included... // TODO: Change the way this is included...
require('./lib/individualCommands') require('./lib/individualCommands')
require('./lib/extendedApi')

View File

@@ -1,12 +1,12 @@
'use strict' 'use strict'
const tls = require('tls')
const Parser = require('redis-parser')
const net = require('net') const net = require('net')
const onConnect = require('./readyHandler') const Parser = require('redis-parser')
const tls = require('tls')
const debug = require('./debug') const debug = require('./debug')
const replyHandler = require('./replyHandler')
const flushAndError = require('./flushAndError') const flushAndError = require('./flushAndError')
const onConnect = require('./readyHandler')
const replyHandler = require('./replyHandler')
const onResult = replyHandler.onResult const onResult = replyHandler.onResult
const onError = replyHandler.onError const onError = replyHandler.onError

View File

@@ -1,116 +0,0 @@
'use strict'
const utils = require('./utils')
const debug = require('./debug')
const RedisClient = require('../').RedisClient
const Command = require('./command')
const Multi = require('./multi')
const flushAndError = require('./flushAndError')
const noop = function () {}
/**********************************************
All documented and exposed API belongs in here
**********************************************/
// Redirect calls to the appropriate function and use to send arbitrary / not supported commands
// TODO: REMOVE sendCommand and replace it by a function to add new commands
// TODO: Add a library to add the sendCommand back in place for legacy reasons
RedisClient.prototype.sendCommand = function (command, args) {
// Throw to fail early instead of relying in order in this case
if (typeof command !== 'string') {
throw new TypeError(`Wrong input type "${command !== null && command !== undefined ? command.constructor.name : command}" for command name`)
}
if (!Array.isArray(args)) {
if (args === undefined || args === null) {
args = []
} else {
throw new TypeError(`Wrong input type "${args.constructor.name}" for args`)
}
}
// Using the raw multi command is only possible with this function
// If the command is not yet added to the client, the internal function should be called right away
// Otherwise we need to redirect the calls to make sure the internal functions don't get skipped
// The internal functions could actually be used for any non hooked function
// but this might change from time to time and at the moment there's no good way to distinguish them
// from each other, so let's just do it do it this way for the time being
if (command === 'multi' || typeof this[command] !== 'function') {
return this.internalSendCommand(new Command(command, args))
}
return this[command].apply(this, args)
}
RedisClient.prototype.end = function (flush) {
if (typeof flush !== 'boolean') {
throw new TypeError('You must call "end" with the flush argument.')
}
// Flush queue if wanted
if (flush) {
flushAndError(this, 'Connection forcefully ended and command aborted.', 'NR_CLOSED')
}
// Clear retryTimer
if (this.retryTimer) {
clearTimeout(this.retryTimer)
this.retryTimer = null
}
this._stream.removeAllListeners()
this._stream.on('error', noop)
this.connected = false
this.ready = false
this.closing = true
return this._stream.destroySoon()
}
RedisClient.prototype.unref = function () {
if (this.connected) {
debug('Unref\'ing the socket connection')
this._stream.unref()
} else {
debug('Not connected yet, will unref later')
this.once('connect', function () {
this.unref()
})
}
}
// TODO: promisify this
RedisClient.prototype.duplicate = function (options, callback) {
if (typeof options === 'function') {
callback = options
options = null
}
const existingOptions = utils.clone(this.options)
options = utils.clone(options)
for (const elem in options) {
if (options.hasOwnProperty(elem)) {
existingOptions[elem] = options[elem]
}
}
const client = new RedisClient(existingOptions)
client.selectedDb = this.selectedDb
if (typeof callback === 'function') {
const errorListener = function (err) {
callback(err)
client.end(true)
}
const readyListener = function () {
callback(null, client)
client.removeAllListeners(errorListener)
}
client.once('ready', readyListener)
client.once('error', errorListener)
return client
}
return client
}
// Note: this overrides a native function!
RedisClient.prototype.multi = function multi (args) {
return new Multi(this, 'multi', args)
}
// Note: This is not a native function but is still handled as a individual command as it behaves just the same as multi
RedisClient.prototype.batch = function batch (args) {
return new Multi(this, 'batch', args)
}

View File

@@ -1,8 +1,8 @@
'use strict' 'use strict'
const Command = require('./command')
const debug = require('./debug') const debug = require('./debug')
const Multi = require('./multi') const Multi = require('./multi')
const Command = require('./command')
const utils = require('./utils') const utils = require('./utils')
const noPasswordIsSet = /no password is set/ const noPasswordIsSet = /no password is set/
const RedisClient = require('../').RedisClient const RedisClient = require('../').RedisClient

View File

@@ -1,9 +1,9 @@
'use strict' 'use strict'
const Errors = require('redis-errors')
const Queue = require('denque') const Queue = require('denque')
const utils = require('./utils') const Errors = require('redis-errors')
const Command = require('./command') const Command = require('./command')
const utils = require('./utils')
const handleReply = utils.handleReply const handleReply = utils.handleReply
/** /**

View File

@@ -1,8 +1,8 @@
'use strict' 'use strict'
const Errors = require('redis-errors') const Errors = require('redis-errors')
const utils = require('./utils')
const debug = require('./debug') const debug = require('./debug')
const utils = require('./utils')
function offlineCommand (client, command) { function offlineCommand (client, command) {
const commandName = command.command.toUpperCase() const commandName = command.command.toUpperCase()

View File

@@ -1,7 +1,7 @@
'use strict' 'use strict'
const debug = require('./debug')
const Command = require('./command') const Command = require('./command')
const debug = require('./debug')
const utils = require('./utils') const utils = require('./utils')
function onConnect (client) { function onConnect (client) {

View File

@@ -1,9 +1,9 @@
'use strict' 'use strict'
const Errors = require('redis-errors') const Errors = require('redis-errors')
const connect = require('./connect')
const debug = require('./debug') const debug = require('./debug')
const flushAndError = require('./flushAndError') const flushAndError = require('./flushAndError')
const connect = require('./connect')
/** /**
* @description Try connecting to a server again * @description Try connecting to a server again

View File

@@ -1,8 +1,8 @@
'use strict' 'use strict'
const Buffer = require('buffer').Buffer const Buffer = require('buffer').Buffer
const utils = require('./utils')
const pubsub = require('./pubsub') const pubsub = require('./pubsub')
const utils = require('./utils')
function onError (client, err) { function onError (client, err) {
const commandObj = client.commandQueue.shift() const commandObj = client.commandQueue.shift()

View File

@@ -149,6 +149,11 @@ function handleReply (client, reply, command) {
return reply return reply
} }
/**
* @description Set default reconnect variables
*
* @param {RedisClient} client
*/
function setReconnectDefaults (client) { function setReconnectDefaults (client) {
client.retryTimer = null client.retryTimer = null
client.retryTotaltime = 0 client.retryTotaltime = 0

View File

@@ -1,8 +1,8 @@
'use strict' 'use strict'
const Commands = require('redis-commands') const Commands = require('redis-commands')
const utils = require('./utils')
const debug = require('./debug') const debug = require('./debug')
const utils = require('./utils')
// const isUint8Array = (() => { // const isUint8Array = (() => {
// try { // try {
// return process.binding('util').isUint8Array // return process.binding('util').isUint8Array

View File

@@ -23,7 +23,7 @@
"test": "nyc --cache mocha ./test/*.js ./test/commands/*.js --timeout=8000", "test": "nyc --cache mocha ./test/*.js ./test/commands/*.js --timeout=8000",
"posttest": "npm run coverage", "posttest": "npm run coverage",
"compare": "node benchmarks/diff_multi_bench_output.js beforeBench.txt afterBench.txt", "compare": "node benchmarks/diff_multi_bench_output.js beforeBench.txt afterBench.txt",
"lint": "eslint . --fix" "lint": "standard . --fix"
}, },
"dependencies": { "dependencies": {
"denque": "^1.1.1", "denque": "^1.1.1",