You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
chore: refactor parts out of the index.js file
This commit is contained in:
189
index.js
189
index.js
@@ -8,17 +8,19 @@
|
||||
// TODO: Rewrite this to classes
|
||||
const Buffer = require('buffer').Buffer
|
||||
const net = require('net')
|
||||
const tls = require('tls')
|
||||
const util = require('util')
|
||||
const utils = require('./lib/utils')
|
||||
const Command = require('./lib/command')
|
||||
const Queue = require('denque')
|
||||
const errorClasses = require('./lib/customErrors')
|
||||
const EventEmitter = require('events')
|
||||
const Parser = require('redis-parser')
|
||||
const Errors = require('redis-errors')
|
||||
const debug = require('./lib/debug')
|
||||
const connect = require('./lib/connect')
|
||||
const Commands = require('redis-commands')
|
||||
const addCommand = require('./lib/commands')
|
||||
const unifyOptions = require('./lib/createClient')
|
||||
const Multi = require('./lib/multi')
|
||||
const normalizeAndWriteCommand = require('./lib/writeCommands')
|
||||
const SUBSCRIBE_COMMANDS = {
|
||||
subscribe: true,
|
||||
@@ -42,8 +44,6 @@ function handleDetectBuffersReply (reply, command, bufferArgs) {
|
||||
return reply
|
||||
}
|
||||
|
||||
exports.debugMode = /\bredis\b/i.test(process.env.NODE_DEBUG)
|
||||
|
||||
// Attention: The second parameter might be removed at will and is not officially supported.
|
||||
// Do not rely on this
|
||||
function RedisClient (options, stream) {
|
||||
@@ -88,7 +88,9 @@ function RedisClient (options, stream) {
|
||||
options.detectBuffers = !!options.detectBuffers
|
||||
// Override the detectBuffers setting if returnBuffers is active and print a warning
|
||||
if (options.returnBuffers && options.detectBuffers) {
|
||||
this.warn('WARNING: You activated returnBuffers and detectBuffers at the same time. The return value is always going to be a buffer.')
|
||||
process.nextTick(() =>
|
||||
utils.warn(this, 'WARNING: You activated returnBuffers and detectBuffers at the same time. The return value is always going to be a buffer.')
|
||||
)
|
||||
options.detectBuffers = false
|
||||
}
|
||||
if (options.detectBuffers) {
|
||||
@@ -130,14 +132,13 @@ function RedisClient (options, stream) {
|
||||
}
|
||||
this.retryStrategyProvided = !!options.retryStrategy
|
||||
this.subscribeChannels = []
|
||||
// Init parser
|
||||
this.replyParser = createParser(this)
|
||||
this.createStream()
|
||||
// Init parser and connect
|
||||
connect(this)
|
||||
this.on('newListener', function (event) {
|
||||
if ((event === 'messageBuffer' || event === 'pmessageBuffer') && !this.buffers && !this.messageBuffers) {
|
||||
this.messageBuffers = true
|
||||
this.handleReply = handleDetectBuffersReply
|
||||
this.replyParser.setReturnBuffers(true)
|
||||
this._replyParser.setReturnBuffers(true)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -145,35 +146,6 @@ util.inherits(RedisClient, EventEmitter)
|
||||
|
||||
RedisClient.connectionId = 0
|
||||
|
||||
function createParser (self) {
|
||||
return new Parser({
|
||||
returnReply (data) {
|
||||
self.returnReply(data)
|
||||
},
|
||||
returnError (err) {
|
||||
// Return a ReplyError to indicate Redis returned an error
|
||||
self.returnError(err)
|
||||
},
|
||||
returnFatalError (err) {
|
||||
// Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again
|
||||
// Note: the execution order is important. First flush and emit, then create the stream
|
||||
err.message += '. Please report this.'
|
||||
self.ready = false
|
||||
self.flushAndError({
|
||||
message: 'Fatal error encountered. Command aborted.',
|
||||
code: 'NR_FATAL'
|
||||
}, {
|
||||
error: err,
|
||||
queues: ['commandQueue']
|
||||
})
|
||||
self.createStream()
|
||||
setImmediate(() => self.emit('error', err))
|
||||
},
|
||||
returnBuffers: self.buffers || self.messageBuffers,
|
||||
stringNumbers: self.options.stringNumbers || false
|
||||
})
|
||||
}
|
||||
|
||||
/******************************************************************************
|
||||
|
||||
All functions in here are internal besides the RedisClient constructor
|
||||
@@ -182,88 +154,6 @@ function createParser (self) {
|
||||
|
||||
******************************************************************************/
|
||||
|
||||
// Attention: the function name "createStream" should not be changed, as other libraries need this to mock the stream (e.g. fakeredis)
|
||||
RedisClient.prototype.createStream = function () {
|
||||
// Init parser
|
||||
this.replyParser = createParser(this)
|
||||
|
||||
if (this.options.stream) {
|
||||
// Only add the listeners once in case of a reconnect try (that won't work)
|
||||
if (this.stream) {
|
||||
return
|
||||
}
|
||||
this.stream = this.options.stream
|
||||
} else {
|
||||
// On a reconnect destroy the former stream and retry
|
||||
if (this.stream) {
|
||||
this.stream.removeAllListeners()
|
||||
this.stream.destroy()
|
||||
}
|
||||
|
||||
/* istanbul ignore if: travis does not work with stunnel atm. Therefore the tls tests are skipped on travis */
|
||||
if (this.options.tls) {
|
||||
this.stream = tls.connect(this.connectionOptions)
|
||||
} else {
|
||||
this.stream = net.createConnection(this.connectionOptions)
|
||||
}
|
||||
}
|
||||
|
||||
if (this.options.connectTimeout) {
|
||||
// TODO: Investigate why this is not properly triggered
|
||||
this.stream.setTimeout(this.connectTimeout, () => {
|
||||
// Note: This is only tested if a internet connection is established
|
||||
this.connectionGone('timeout')
|
||||
})
|
||||
}
|
||||
|
||||
/* istanbul ignore next: travis does not work with stunnel atm. Therefore the tls tests are skipped on travis */
|
||||
const connectEvent = this.options.tls ? 'secureConnect' : 'connect'
|
||||
this.stream.once(connectEvent, () => {
|
||||
this.stream.removeAllListeners('timeout')
|
||||
this.timesConnected++
|
||||
this.onConnect()
|
||||
})
|
||||
|
||||
this.stream.on('data', (bufferFromSocket) => {
|
||||
// The bufferFromSocket.toString() has a significant impact on big chunks and therefore this should only be used if necessary
|
||||
debug('Net read %s id %s', this.address, this.connectionId) // + ': ' + bufferFromSocket.toString());
|
||||
this.replyParser.execute(bufferFromSocket)
|
||||
})
|
||||
|
||||
this.stream.on('error', (err) => {
|
||||
this.onError(err)
|
||||
})
|
||||
|
||||
/* istanbul ignore next: difficult to test and not important as long as we keep this listener */
|
||||
this.stream.on('clientError', (err) => {
|
||||
debug('clientError occurred')
|
||||
this.onError(err)
|
||||
})
|
||||
|
||||
this.stream.once('close', (hadError) => {
|
||||
this.connectionGone('close')
|
||||
})
|
||||
|
||||
this.stream.once('end', () => {
|
||||
this.connectionGone('end')
|
||||
})
|
||||
|
||||
this.stream.setNoDelay()
|
||||
|
||||
// Fire the command before redis is connected to be sure it's the first fired command
|
||||
if (this.authPass !== undefined) {
|
||||
this.ready = true
|
||||
this.auth(this.authPass).catch((err) => {
|
||||
this.closing = true
|
||||
process.nextTick(() => {
|
||||
this.emit('error', err)
|
||||
this.end(true)
|
||||
})
|
||||
})
|
||||
this.ready = false
|
||||
}
|
||||
}
|
||||
|
||||
RedisClient.prototype.handleReply = function (reply, command) {
|
||||
if (command === 'hgetall') {
|
||||
reply = utils.replyToObject(reply)
|
||||
@@ -281,18 +171,6 @@ RedisClient.prototype.initializeRetryVars = function () {
|
||||
this.attempts = 1
|
||||
}
|
||||
|
||||
RedisClient.prototype.warn = function (msg) {
|
||||
// Warn on the next tick. Otherwise no event listener can be added
|
||||
// for warnings that are emitted in the redis client constructor
|
||||
process.nextTick(() => {
|
||||
if (this.listeners('warning').length !== 0) {
|
||||
this.emit('warning', msg)
|
||||
} else {
|
||||
console.warn('nodeRedis:', msg)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Flush provided queues, erroring any items with a callback first
|
||||
RedisClient.prototype.flushAndError = function (errorAttributes, options) {
|
||||
options = options || {}
|
||||
@@ -347,8 +225,8 @@ RedisClient.prototype.onConnect = function () {
|
||||
this.connected = true
|
||||
this.ready = false
|
||||
this.emittedEnd = false
|
||||
this.stream.setKeepAlive(this.options.socketKeepalive)
|
||||
this.stream.setTimeout(0)
|
||||
this._stream.setKeepAlive(this.options.socketKeepalive)
|
||||
this._stream.setTimeout(0)
|
||||
|
||||
this.emit('connect')
|
||||
this.initializeRetryVars()
|
||||
@@ -366,7 +244,7 @@ RedisClient.prototype.onReady = function () {
|
||||
|
||||
this.cork = () => {
|
||||
this.pipeline = true
|
||||
this.stream.cork()
|
||||
this._stream.cork()
|
||||
}
|
||||
this.uncork = () => {
|
||||
if (this.fireStrings) {
|
||||
@@ -377,7 +255,7 @@ RedisClient.prototype.onReady = function () {
|
||||
this.pipeline = false
|
||||
this.fireStrings = true
|
||||
// TODO: Consider using next tick here. See https://github.com/NodeRedis/nodeRedis/issues/1033
|
||||
this.stream.uncork()
|
||||
this._stream.uncork()
|
||||
}
|
||||
|
||||
// Restore modal commands from previous connection. The order of the commands is important
|
||||
@@ -498,7 +376,7 @@ const retryConnection = function (self, error) {
|
||||
|
||||
self.retryTotaltime += self.retryDelay
|
||||
self.attempts += 1
|
||||
self.createStream()
|
||||
connect(self)
|
||||
self.retryTimer = null
|
||||
}
|
||||
|
||||
@@ -737,7 +615,7 @@ function handleOfflineCommand (self, commandObj) {
|
||||
if (self.closing || !self.enableOfflineQueue) {
|
||||
command = command.toUpperCase()
|
||||
if (!self.closing) {
|
||||
if (self.stream.writable) {
|
||||
if (self._stream.writable) {
|
||||
msg = 'The connection is not yet established and the offline queue is deactivated.'
|
||||
} else {
|
||||
msg = 'Stream not writeable.'
|
||||
@@ -764,7 +642,7 @@ function handleOfflineCommand (self, commandObj) {
|
||||
// Do not call internalSendCommand directly, if you are not absolutely certain it handles everything properly
|
||||
// e.g. monitor / info does not work with internalSendCommand only
|
||||
RedisClient.prototype.internalSendCommand = function (commandObj) {
|
||||
if (this.ready === false || this.stream.writable === false) {
|
||||
if (this.ready === false || this._stream.writable === false) {
|
||||
// Handle offline commands right away
|
||||
handleOfflineCommand(this, commandObj)
|
||||
return commandObj.promise
|
||||
@@ -798,19 +676,19 @@ RedisClient.prototype.writeStrings = function () {
|
||||
for (var command = this.pipelineQueue.shift(); command; command = this.pipelineQueue.shift()) {
|
||||
// Write to stream if the string is bigger than 4mb. The biggest string may be Math.pow(2, 28) - 15 chars long
|
||||
if (str.length + command.length > 4 * 1024 * 1024) {
|
||||
this.shouldBuffer = !this.stream.write(str)
|
||||
this.shouldBuffer = !this._stream.write(str)
|
||||
str = ''
|
||||
}
|
||||
str += command
|
||||
}
|
||||
if (str !== '') {
|
||||
this.shouldBuffer = !this.stream.write(str)
|
||||
this.shouldBuffer = !this._stream.write(str)
|
||||
}
|
||||
}
|
||||
|
||||
RedisClient.prototype.writeBuffers = function () {
|
||||
for (var command = this.pipelineQueue.shift(); command; command = this.pipelineQueue.shift()) {
|
||||
this.shouldBuffer = !this.stream.write(command)
|
||||
this.shouldBuffer = !this._stream.write(command)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -820,24 +698,29 @@ RedisClient.prototype.writeBuffers = function () {
|
||||
// This can only be used for strings only though.
|
||||
RedisClient.prototype.write = function (data) {
|
||||
if (this.pipeline === false) {
|
||||
this.shouldBuffer = !this.stream.write(data)
|
||||
this.shouldBuffer = !this._stream.write(data)
|
||||
return
|
||||
}
|
||||
this.pipelineQueue.push(data)
|
||||
}
|
||||
|
||||
exports.createClient = function () {
|
||||
return new RedisClient(unifyOptions.apply(null, arguments))
|
||||
Commands.list.forEach((name) => addCommand(RedisClient.prototype, Multi.prototype, name))
|
||||
|
||||
module.exports = {
|
||||
debugMode: /\bredis\b/i.test(process.env.NODE_DEBUG),
|
||||
RedisClient,
|
||||
Multi,
|
||||
AbortError: errorClasses.AbortError,
|
||||
ParserError: Errors.ParserError,
|
||||
RedisError: Errors.RedisError,
|
||||
ReplyError: Errors.ReplyError,
|
||||
InterruptError: Errors.InterruptError,
|
||||
createClient () {
|
||||
return new RedisClient(unifyOptions.apply(null, arguments))
|
||||
}
|
||||
}
|
||||
exports.RedisClient = RedisClient
|
||||
exports.Multi = require('./lib/multi')
|
||||
exports.AbortError = errorClasses.AbortError
|
||||
exports.RedisError = Errors.RedisError
|
||||
exports.ParserError = Errors.ParserError
|
||||
exports.ReplyError = Errors.ReplyError
|
||||
exports.InterruptError = Errors.InterruptError
|
||||
|
||||
// Add all redis commands / nodeRedis api to the client
|
||||
// TODO: Change the way this is included...
|
||||
require('./lib/individualCommands')
|
||||
require('./lib/extendedApi')
|
||||
require('./lib/commands')
|
||||
|
Reference in New Issue
Block a user