You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-04 15:02:09 +03:00
fix: always emit an error when the connection drops
This commit is contained in:
@@ -28,6 +28,7 @@ Features
|
|||||||
- Native promise support
|
- Native promise support
|
||||||
- Auto pipelining
|
- Auto pipelining
|
||||||
- The client is now exported directly and be instantiated directly
|
- The client is now exported directly and be instantiated directly
|
||||||
|
- `client.duplicate` will now also transition into pub sub or monitor mode
|
||||||
|
|
||||||
Breaking Changes
|
Breaking Changes
|
||||||
|
|
||||||
|
25
index.js
25
index.js
@@ -59,17 +59,7 @@ class RedisClient extends EventEmitter {
|
|||||||
cnxOptions.family = (!options.family && net.isIP(cnxOptions.host)) || (options.family === 'IPv6' ? 6 : 4)
|
cnxOptions.family = (!options.family && net.isIP(cnxOptions.host)) || (options.family === 'IPv6' ? 6 : 4)
|
||||||
this.address = `${cnxOptions.host}:${cnxOptions.port}`
|
this.address = `${cnxOptions.host}:${cnxOptions.port}`
|
||||||
}
|
}
|
||||||
|
// TODO: Properly fix typo
|
||||||
// Public Variables
|
|
||||||
this.connected = false
|
|
||||||
this.shouldBuffer = false
|
|
||||||
this.commandQueue = new Queue() // Holds sent commands to de-pipeline them
|
|
||||||
this.offlineQueue = new Queue() // Holds commands issued but not able to be sent
|
|
||||||
this.serverInfo = {}
|
|
||||||
|
|
||||||
// Private Variables
|
|
||||||
this._connectionOptions = cnxOptions
|
|
||||||
this.connectionId = RedisClient.connectionId++
|
|
||||||
if (options.socketKeepalive === undefined) {
|
if (options.socketKeepalive === undefined) {
|
||||||
options.socketKeepalive = true
|
options.socketKeepalive = true
|
||||||
}
|
}
|
||||||
@@ -109,6 +99,14 @@ class RedisClient extends EventEmitter {
|
|||||||
}
|
}
|
||||||
options.returnBuffers = !!options.returnBuffers
|
options.returnBuffers = !!options.returnBuffers
|
||||||
options.detectBuffers = !!options.detectBuffers
|
options.detectBuffers = !!options.detectBuffers
|
||||||
|
|
||||||
|
// Public Variables
|
||||||
|
this.connected = false
|
||||||
|
this.shouldBuffer = false
|
||||||
|
this.commandQueue = new Queue() // Holds sent commands
|
||||||
|
this.offlineQueue = new Queue() // Holds commands issued but not able to be sent yet
|
||||||
|
this.serverInfo = {}
|
||||||
|
this.connectionId = connectionId++
|
||||||
this.selectedDb = options.db // Save the selected db here, used when reconnecting
|
this.selectedDb = options.db // Save the selected db here, used when reconnecting
|
||||||
this._strCache = ''
|
this._strCache = ''
|
||||||
this._pipeline = false
|
this._pipeline = false
|
||||||
@@ -251,7 +249,12 @@ class RedisClient extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
const client = new RedisClient(existingOptions)
|
const client = new RedisClient(existingOptions)
|
||||||
|
// Return to the same state as the other client
|
||||||
|
// by also selecting the db / returning to pub sub
|
||||||
|
// mode or into monitor mode.
|
||||||
client.selectedDb = this.selectedDb
|
client.selectedDb = this.selectedDb
|
||||||
|
client._subscriptionSet = this._subscriptionSet
|
||||||
|
client._monitoring = this._monitoring
|
||||||
if (typeof callback === 'function') {
|
if (typeof callback === 'function') {
|
||||||
const errorListener = (err) => {
|
const errorListener = (err) => {
|
||||||
callback(err)
|
callback(err)
|
||||||
|
@@ -102,7 +102,8 @@ function connect (client) {
|
|||||||
const stream = client._stream
|
const stream = client._stream
|
||||||
|
|
||||||
if (options.connectTimeout) {
|
if (options.connectTimeout) {
|
||||||
// TODO: Investigate why this is not properly triggered
|
// TODO: Investigate why this is not properly triggered.
|
||||||
|
// TODO: Check if this works with tls.
|
||||||
stream.setTimeout(client._connectTimeout, () => {
|
stream.setTimeout(client._connectTimeout, () => {
|
||||||
// Note: This is only tested if a internet connection is established
|
// Note: This is only tested if a internet connection is established
|
||||||
lazyReconnect(client, 'timeout')
|
lazyReconnect(client, 'timeout')
|
||||||
|
@@ -2,10 +2,11 @@
|
|||||||
|
|
||||||
const Errors = require('redis-errors')
|
const Errors = require('redis-errors')
|
||||||
|
|
||||||
// Flush provided queues, erroring any items with a callback first
|
// Flush provided queues, erroring out all items
|
||||||
function flushAndError (client, message, code, options) {
|
function flushAndError (client, message, code, options) {
|
||||||
options = options || {}
|
options = options || {}
|
||||||
const queueNames = options.queues || ['commandQueue', 'offlineQueue'] // Flush the commandQueue first to keep the order intact
|
// Flush the commandQueue first to keep the order intact
|
||||||
|
const queueNames = options.queues || ['commandQueue', 'offlineQueue']
|
||||||
for (var i = 0; i < queueNames.length; i++) {
|
for (var i = 0; i < queueNames.length; i++) {
|
||||||
// If the command was fired it might have been processed so far
|
// If the command was fired it might have been processed so far
|
||||||
const ErrorClass = queueNames[i] === 'commandQueue'
|
const ErrorClass = queueNames[i] === 'commandQueue'
|
||||||
|
@@ -78,17 +78,20 @@ function reconnect (client, why, error) {
|
|||||||
timesConnected: client.timesConnected
|
timesConnected: client.timesConnected
|
||||||
})
|
})
|
||||||
if (typeof client.retryDelay !== 'number') {
|
if (typeof client.retryDelay !== 'number') {
|
||||||
// Pass individual error through
|
var err
|
||||||
if (client.retryDelay instanceof Error) {
|
if (client.retryDelay instanceof Error) {
|
||||||
error = client.retryDelay
|
// Pass individual error through
|
||||||
}
|
err = client.retryDelay
|
||||||
flushAndError(client, 'Stream connection ended and command aborted.', 'NR_CLOSED', {
|
flushAndError(client, 'Stream connection ended and command aborted.', 'NR_CLOSED', { error: err })
|
||||||
error
|
} else {
|
||||||
})
|
flushAndError(client, 'Stream connection ended and command aborted.', 'NR_CLOSED', { error })
|
||||||
// TODO: Check if client is so smart
|
err = new Errors.RedisError('Redis connection ended.')
|
||||||
|
err.code = 'NR_CLOSED'
|
||||||
if (error) {
|
if (error) {
|
||||||
client.emit('error', error)
|
err.origin = error
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
client.emit('error', err)
|
||||||
client.end(false)
|
client.end(false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@@ -153,7 +153,7 @@ describe('connection tests', () => {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
it.skip('can not connect with wrong host / port in the options object', (done) => {
|
it('can not connect with wrong host / port in the options object', (done) => {
|
||||||
const options = {
|
const options = {
|
||||||
host: 'somewhere',
|
host: 'somewhere',
|
||||||
port: 6379,
|
port: 6379,
|
||||||
@@ -163,11 +163,10 @@ describe('connection tests', () => {
|
|||||||
client = Redis.createClient(options)
|
client = Redis.createClient(options)
|
||||||
assert.strictEqual(client._connectionOptions.family, ip === 'IPv6' ? 6 : 4)
|
assert.strictEqual(client._connectionOptions.family, ip === 'IPv6' ? 6 : 4)
|
||||||
assert.strictEqual(Object.keys(options).length, 4)
|
assert.strictEqual(Object.keys(options).length, 4)
|
||||||
const end = helper.callFuncAfter(done, 2)
|
|
||||||
|
|
||||||
client.on('error', (err) => {
|
client.on('error', (err) => {
|
||||||
assert(/CONNECTION_BROKEN|ENOTFOUND|EAI_AGAIN/.test(err.code))
|
assert(/NR_CLOSED/.test(err.code))
|
||||||
end()
|
done()
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -206,7 +205,7 @@ describe('connection tests', () => {
|
|||||||
},
|
},
|
||||||
port: 9999
|
port: 9999
|
||||||
})
|
})
|
||||||
client.on('error', helper.isError(/Redis connection to 127\.0\.0\.1:9999 failed/))
|
client.on('error', helper.isError(/Redis connection ended/))
|
||||||
})
|
})
|
||||||
|
|
||||||
it('retryStrategy used to reconnect with defaults', (done) => {
|
it('retryStrategy used to reconnect with defaults', (done) => {
|
||||||
@@ -230,6 +229,7 @@ describe('connection tests', () => {
|
|||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
client._stream.destroy()
|
client._stream.destroy()
|
||||||
}, 50)
|
}, 50)
|
||||||
|
client.on('error', helper.isError(/Redis connection ended/))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@@ -194,7 +194,7 @@ describe('The \'multi\' method', () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
describe('when connection is broken', () => {
|
describe('when connection is broken', () => {
|
||||||
it.skip('return an error even if connection is in broken mode', (done) => {
|
it('return an error even if connection is in broken mode', (done) => {
|
||||||
client = redis.createClient({
|
client = redis.createClient({
|
||||||
host: 'somewhere',
|
host: 'somewhere',
|
||||||
port: 6379,
|
port: 6379,
|
||||||
@@ -202,13 +202,12 @@ describe('The \'multi\' method', () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
client.on('error', (err) => {
|
client.on('error', (err) => {
|
||||||
if (/Redis connection in broken state/.test(err.message)) {
|
assert.strictEqual(err.code, 'NR_CLOSED')
|
||||||
done()
|
done()
|
||||||
}
|
|
||||||
})
|
})
|
||||||
|
|
||||||
client.multi([['set', 'foo', 'bar'], ['get', 'foo']]).exec().catch((err) => {
|
client.multi([['set', 'foo', 'bar'], ['get', 'foo']]).exec().catch((err) => {
|
||||||
// assert(/Redis connection in broken state/.test(err.message));
|
assert(/Stream connection ended and command aborted/.test(err.message))
|
||||||
assert.strictEqual(err.errors.length, 2)
|
assert.strictEqual(err.errors.length, 2)
|
||||||
assert.strictEqual(err.errors[0].args.length, 2)
|
assert.strictEqual(err.errors[0].args.length, 2)
|
||||||
})
|
})
|
||||||
|
@@ -507,8 +507,7 @@ describe('The nodeRedis client', () => {
|
|||||||
}, 50)
|
}, 50)
|
||||||
})
|
})
|
||||||
|
|
||||||
// TODO: Fix this by adding the CONNECTION_BROKEN back in
|
it('enqueues operation and keep the queue while trying to reconnect', (done) => {
|
||||||
it.skip('enqueues operation and keep the queue while trying to reconnect', (done) => {
|
|
||||||
client = redis.createClient(9999, null, {
|
client = redis.createClient(9999, null, {
|
||||||
retryStrategy (options) {
|
retryStrategy (options) {
|
||||||
if (options.attempt < 4) {
|
if (options.attempt < 4) {
|
||||||
@@ -519,7 +518,6 @@ describe('The nodeRedis client', () => {
|
|||||||
let i = 0
|
let i = 0
|
||||||
|
|
||||||
client.on('error', (err) => {
|
client.on('error', (err) => {
|
||||||
console.log(err)
|
|
||||||
if (err.code === 'NR_CLOSED') {
|
if (err.code === 'NR_CLOSED') {
|
||||||
assert(i, 3)
|
assert(i, 3)
|
||||||
assert.strictEqual(client.offlineQueue.length, 0)
|
assert.strictEqual(client.offlineQueue.length, 0)
|
||||||
|
Reference in New Issue
Block a user