You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-04 15:02:09 +03:00
chore: refactor some code and remove obsolete variable
This commit is contained in:
37
index.js
37
index.js
@@ -54,7 +54,6 @@ function RedisClient (options, stream) {
|
|||||||
this.connectionOptions = cnxOptions
|
this.connectionOptions = cnxOptions
|
||||||
this.connectionId = RedisClient.connectionId++
|
this.connectionId = RedisClient.connectionId++
|
||||||
this.connected = false
|
this.connected = false
|
||||||
this.ready = false
|
|
||||||
if (options.socketKeepalive === undefined) {
|
if (options.socketKeepalive === undefined) {
|
||||||
options.socketKeepalive = true
|
options.socketKeepalive = true
|
||||||
}
|
}
|
||||||
@@ -81,7 +80,6 @@ function RedisClient (options, stream) {
|
|||||||
// Only used as timeout until redis has to be connected to redis until throwing an connection error
|
// Only used as timeout until redis has to be connected to redis until throwing an connection error
|
||||||
this.connectTimeout = +options.connectTimeout || 60000 // 60 * 1000 ms
|
this.connectTimeout = +options.connectTimeout || 60000 // 60 * 1000 ms
|
||||||
this.enableOfflineQueue = options.enableOfflineQueue !== false
|
this.enableOfflineQueue = options.enableOfflineQueue !== false
|
||||||
this.initializeRetryVars()
|
|
||||||
this.pubSubMode = 0
|
this.pubSubMode = 0
|
||||||
this.subscriptionSet = {}
|
this.subscriptionSet = {}
|
||||||
this.monitoring = false
|
this.monitoring = false
|
||||||
@@ -109,6 +107,7 @@ function RedisClient (options, stream) {
|
|||||||
}
|
}
|
||||||
this.retryStrategyProvided = !!options.retryStrategy
|
this.retryStrategyProvided = !!options.retryStrategy
|
||||||
this.subscribeChannels = []
|
this.subscribeChannels = []
|
||||||
|
utils.setReconnectDefaults(this)
|
||||||
// Init parser and connect
|
// Init parser and connect
|
||||||
connect(this)
|
connect(this)
|
||||||
this.on('newListener', function (event) {
|
this.on('newListener', function (event) {
|
||||||
@@ -122,40 +121,6 @@ util.inherits(RedisClient, EventEmitter)
|
|||||||
|
|
||||||
RedisClient.connectionId = 0
|
RedisClient.connectionId = 0
|
||||||
|
|
||||||
RedisClient.prototype.initializeRetryVars = function () {
|
|
||||||
this.retryTimer = null
|
|
||||||
this.retryTotaltime = 0
|
|
||||||
this.retryDelay = 100
|
|
||||||
this.attempts = 1
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flush provided queues, erroring any items with a callback first
|
|
||||||
RedisClient.prototype.flushAndError = function (message, code, options) {
|
|
||||||
options = options || {}
|
|
||||||
const queueNames = options.queues || ['commandQueue', 'offlineQueue'] // Flush the commandQueue first to keep the order intact
|
|
||||||
for (var i = 0; i < queueNames.length; i++) {
|
|
||||||
// If the command was fired it might have been processed so far
|
|
||||||
const ErrorClass = queueNames[i] === 'commandQueue'
|
|
||||||
? Errors.InterruptError
|
|
||||||
: Errors.AbortError
|
|
||||||
|
|
||||||
while (this[queueNames[i]].length) {
|
|
||||||
const command = this[queueNames[i]].shift()
|
|
||||||
const err = new ErrorClass(message)
|
|
||||||
err.code = code
|
|
||||||
err.command = command.command.toUpperCase()
|
|
||||||
err.args = command.args
|
|
||||||
if (command.error) {
|
|
||||||
err.stack = err.stack + command.error.stack.replace(/^Error.*?\n/, '\n')
|
|
||||||
}
|
|
||||||
if (options.error) {
|
|
||||||
err.origin = options.error
|
|
||||||
}
|
|
||||||
command.callback(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do not call internalSendCommand directly, if you are not absolutely certain it handles everything properly
|
// Do not call internalSendCommand directly, if you are not absolutely certain it handles everything properly
|
||||||
// e.g. monitor / info does not work with internalSendCommand only
|
// e.g. monitor / info does not work with internalSendCommand only
|
||||||
RedisClient.prototype.internalSendCommand = function (commandObj) {
|
RedisClient.prototype.internalSendCommand = function (commandObj) {
|
||||||
|
@@ -3,13 +3,18 @@
|
|||||||
const tls = require('tls')
|
const tls = require('tls')
|
||||||
const Parser = require('redis-parser')
|
const Parser = require('redis-parser')
|
||||||
const net = require('net')
|
const net = require('net')
|
||||||
const reconnect = require('./reconnect')
|
|
||||||
const onConnect = require('./readyHandler')
|
const onConnect = require('./readyHandler')
|
||||||
const debug = require('./debug')
|
const debug = require('./debug')
|
||||||
const replyHandler = require('./replyHandler')
|
const replyHandler = require('./replyHandler')
|
||||||
|
const flushAndError = require('./flushAndError')
|
||||||
const onResult = replyHandler.onResult
|
const onResult = replyHandler.onResult
|
||||||
const onError = replyHandler.onError
|
const onError = replyHandler.onError
|
||||||
|
|
||||||
|
var reconnect = function (client, why, err) {
|
||||||
|
reconnect = require('./reconnect')
|
||||||
|
reconnect(client, why, err)
|
||||||
|
}
|
||||||
|
|
||||||
function onStreamError (client, err) {
|
function onStreamError (client, err) {
|
||||||
if (client.closing) {
|
if (client.closing) {
|
||||||
return
|
return
|
||||||
@@ -48,7 +53,7 @@ function createParser (client) {
|
|||||||
// Note: the execution order is important. First flush and emit, then create the stream
|
// Note: the execution order is important. First flush and emit, then create the stream
|
||||||
err.message += '. Please report this.'
|
err.message += '. Please report this.'
|
||||||
client.ready = false
|
client.ready = false
|
||||||
client.flushAndError('Fatal error encountered. Command aborted.', 'NR_FATAL', {
|
flushAndError(client, 'Fatal error encountered. Command aborted.', 'NR_FATAL', {
|
||||||
error: err,
|
error: err,
|
||||||
queues: ['commandQueue']
|
queues: ['commandQueue']
|
||||||
})
|
})
|
||||||
|
@@ -5,6 +5,7 @@ const debug = require('./debug')
|
|||||||
const RedisClient = require('../').RedisClient
|
const RedisClient = require('../').RedisClient
|
||||||
const Command = require('./command')
|
const Command = require('./command')
|
||||||
const Multi = require('./multi')
|
const Multi = require('./multi')
|
||||||
|
const flushAndError = require('./flushAndError')
|
||||||
const noop = function () {}
|
const noop = function () {}
|
||||||
|
|
||||||
/**********************************************
|
/**********************************************
|
||||||
@@ -46,7 +47,7 @@ RedisClient.prototype.end = function (flush) {
|
|||||||
|
|
||||||
// Flush queue if wanted
|
// Flush queue if wanted
|
||||||
if (flush) {
|
if (flush) {
|
||||||
this.flushAndError('Connection forcefully ended and command aborted.', 'NR_CLOSED')
|
flushAndError(this, 'Connection forcefully ended and command aborted.', 'NR_CLOSED')
|
||||||
}
|
}
|
||||||
// Clear retryTimer
|
// Clear retryTimer
|
||||||
if (this.retryTimer) {
|
if (this.retryTimer) {
|
||||||
|
32
lib/flushAndError.js
Normal file
32
lib/flushAndError.js
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const Errors = require('redis-errors')
|
||||||
|
|
||||||
|
// Flush provided queues, erroring any items with a callback first
|
||||||
|
function flushAndError (client, message, code, options) {
|
||||||
|
options = options || {}
|
||||||
|
const queueNames = options.queues || ['commandQueue', 'offlineQueue'] // Flush the commandQueue first to keep the order intact
|
||||||
|
for (var i = 0; i < queueNames.length; i++) {
|
||||||
|
// If the command was fired it might have been processed so far
|
||||||
|
const ErrorClass = queueNames[i] === 'commandQueue'
|
||||||
|
? Errors.InterruptError
|
||||||
|
: Errors.AbortError
|
||||||
|
|
||||||
|
while (client[queueNames[i]].length) {
|
||||||
|
const command = client[queueNames[i]].shift()
|
||||||
|
const err = new ErrorClass(message)
|
||||||
|
err.code = code
|
||||||
|
err.command = command.command.toUpperCase()
|
||||||
|
err.args = command.args
|
||||||
|
if (command.error) {
|
||||||
|
err.stack = err.stack + command.error.stack.replace(/^Error.*?\n/, '\n')
|
||||||
|
}
|
||||||
|
if (options.error) {
|
||||||
|
err.origin = options.error
|
||||||
|
}
|
||||||
|
command.callback(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = flushAndError
|
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
const debug = require('./debug')
|
const debug = require('./debug')
|
||||||
const Command = require('./command')
|
const Command = require('./command')
|
||||||
|
const utils = require('./utils')
|
||||||
|
|
||||||
function onConnect (client) {
|
function onConnect (client) {
|
||||||
debug('Stream connected %s id %s', client.address, client.connectionId)
|
debug('Stream connected %s id %s', client.address, client.connectionId)
|
||||||
@@ -10,14 +11,12 @@ function onConnect (client) {
|
|||||||
// fast properties. If that's not the case, make them fast properties
|
// fast properties. If that's not the case, make them fast properties
|
||||||
// again!
|
// again!
|
||||||
client.connected = true
|
client.connected = true
|
||||||
client.ready = false
|
|
||||||
client.emittedEnd = false
|
|
||||||
client._stream.setKeepAlive(client.options.socketKeepalive)
|
client._stream.setKeepAlive(client.options.socketKeepalive)
|
||||||
client._stream.setTimeout(0)
|
client._stream.setTimeout(0)
|
||||||
|
|
||||||
// TODO: Deprecate the connect event.
|
// TODO: Deprecate the connect event.
|
||||||
client.emit('connect')
|
client.emit('connect')
|
||||||
client.initializeRetryVars()
|
utils.setReconnectDefaults(client)
|
||||||
|
|
||||||
if (client.options.noReadyCheck) {
|
if (client.options.noReadyCheck) {
|
||||||
readyHandler(client)
|
readyHandler(client)
|
||||||
|
@@ -2,10 +2,8 @@
|
|||||||
|
|
||||||
const Errors = require('redis-errors')
|
const Errors = require('redis-errors')
|
||||||
const debug = require('./debug')
|
const debug = require('./debug')
|
||||||
var lazyConnect = function (client) {
|
const flushAndError = require('./flushAndError')
|
||||||
lazyConnect = require('./connect')
|
const connect = require('./connect')
|
||||||
lazyConnect(client)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @description Try connecting to a server again
|
* @description Try connecting to a server again
|
||||||
@@ -27,7 +25,7 @@ function retryConnection (client, error) {
|
|||||||
|
|
||||||
client.retryTotaltime += client.retryDelay
|
client.retryTotaltime += client.retryDelay
|
||||||
client.attempts += 1
|
client.attempts += 1
|
||||||
lazyConnect(client)
|
connect(client)
|
||||||
client.retryTimer = null
|
client.retryTimer = null
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,18 +49,14 @@ function reconnect (client, why, error) {
|
|||||||
client.ready = false
|
client.ready = false
|
||||||
client.pubSubMode = 0
|
client.pubSubMode = 0
|
||||||
|
|
||||||
// since we are collapsing end and close, users don't expect to be called twice
|
|
||||||
if (!client.emittedEnd) {
|
|
||||||
client.emit('end')
|
client.emit('end')
|
||||||
client.emittedEnd = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if (why === 'timeout') {
|
if (why === 'timeout') {
|
||||||
var message = 'Redis connection in broken state: connection timeout exceeded.'
|
var message = 'Redis connection in broken state: connection timeout exceeded.'
|
||||||
const err = new Errors.RedisError(message)
|
const err = new Errors.RedisError(message)
|
||||||
// TODO: Find better error codes...
|
// TODO: Find better error codes...
|
||||||
err.code = 'CONNECTION_BROKEN'
|
err.code = 'CONNECTION_BROKEN'
|
||||||
client.flushAndError(message, 'CONNECTION_BROKEN')
|
flushAndError(client, message, 'CONNECTION_BROKEN')
|
||||||
client.emit('error', err)
|
client.emit('error', err)
|
||||||
client.end(false)
|
client.end(false)
|
||||||
return
|
return
|
||||||
@@ -71,7 +65,7 @@ function reconnect (client, why, error) {
|
|||||||
// If client is a requested shutdown, then don't retry
|
// If client is a requested shutdown, then don't retry
|
||||||
if (client.closing) {
|
if (client.closing) {
|
||||||
debug('Connection ended by quit / end command, not retrying.')
|
debug('Connection ended by quit / end command, not retrying.')
|
||||||
client.flushAndError('Stream connection ended and command aborted.', 'NR_CLOSED', {
|
flushAndError(client, 'Stream connection ended and command aborted.', 'NR_CLOSED', {
|
||||||
error
|
error
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
@@ -88,7 +82,7 @@ function reconnect (client, why, error) {
|
|||||||
if (client.retryDelay instanceof Error) {
|
if (client.retryDelay instanceof Error) {
|
||||||
error = client.retryDelay
|
error = client.retryDelay
|
||||||
}
|
}
|
||||||
client.flushAndError('Stream connection ended and command aborted.', 'NR_CLOSED', {
|
flushAndError(client, 'Stream connection ended and command aborted.', 'NR_CLOSED', {
|
||||||
error
|
error
|
||||||
})
|
})
|
||||||
// TODO: Check if client is so smart
|
// TODO: Check if client is so smart
|
||||||
@@ -107,7 +101,7 @@ function reconnect (client, why, error) {
|
|||||||
// We could postpone writing to the stream until we connected again and fire the commands.
|
// We could postpone writing to the stream until we connected again and fire the commands.
|
||||||
// The commands in the pipelineQueue are also not uncertain. They never left the client.
|
// The commands in the pipelineQueue are also not uncertain. They never left the client.
|
||||||
} else if (client.commandQueue.length !== 0 || client._pipelineQueue.length !== 0) {
|
} else if (client.commandQueue.length !== 0 || client._pipelineQueue.length !== 0) {
|
||||||
client.flushAndError('Redis connection lost and command aborted.', 'UNCERTAIN_STATE', {
|
flushAndError(client, 'Redis connection lost and command aborted.', 'UNCERTAIN_STATE', {
|
||||||
error,
|
error,
|
||||||
queues: ['commandQueue', '_pipelineQueue']
|
queues: ['commandQueue', '_pipelineQueue']
|
||||||
})
|
})
|
||||||
|
11
lib/utils.js
11
lib/utils.js
@@ -149,6 +149,14 @@ function handleReply (client, reply, command) {
|
|||||||
return reply
|
return reply
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function setReconnectDefaults (client) {
|
||||||
|
client.retryTimer = null
|
||||||
|
client.retryTotaltime = 0
|
||||||
|
client.retryDelay = 100
|
||||||
|
client.attempts = 1
|
||||||
|
client.ready = false
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
replyToStrings,
|
replyToStrings,
|
||||||
replyToObject,
|
replyToObject,
|
||||||
@@ -157,5 +165,6 @@ module.exports = {
|
|||||||
clone: convenienceClone,
|
clone: convenienceClone,
|
||||||
replyInOrder,
|
replyInOrder,
|
||||||
warn,
|
warn,
|
||||||
handleReply
|
handleReply,
|
||||||
|
setReconnectDefaults
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user