You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
chore: minor refactoring
This commit is contained in:
13
index.js
13
index.js
@@ -78,8 +78,6 @@ class RedisClient extends EventEmitter {
|
|||||||
options.renameCommands[command.toLowerCase()] = options.renameCommands[command]
|
options.renameCommands[command.toLowerCase()] = options.renameCommands[command]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
options.returnBuffers = !!options.returnBuffers
|
|
||||||
options.detectBuffers = !!options.detectBuffers
|
|
||||||
if (typeof options.enableOfflineQueue !== 'boolean') {
|
if (typeof options.enableOfflineQueue !== 'boolean') {
|
||||||
if (options.enableOfflineQueue !== undefined) {
|
if (options.enableOfflineQueue !== undefined) {
|
||||||
throw new TypeError('enableOfflineQueue must be a boolean')
|
throw new TypeError('enableOfflineQueue must be a boolean')
|
||||||
@@ -109,6 +107,8 @@ class RedisClient extends EventEmitter {
|
|||||||
}
|
}
|
||||||
options.password = options.authPass
|
options.password = options.authPass
|
||||||
}
|
}
|
||||||
|
options.returnBuffers = !!options.returnBuffers
|
||||||
|
options.detectBuffers = !!options.detectBuffers
|
||||||
this.selectedDb = options.db // Save the selected db here, used when reconnecting
|
this.selectedDb = options.db // Save the selected db here, used when reconnecting
|
||||||
this._strCache = ''
|
this._strCache = ''
|
||||||
this._pipeline = false
|
this._pipeline = false
|
||||||
@@ -131,9 +131,12 @@ class RedisClient extends EventEmitter {
|
|||||||
// Init parser and connect
|
// Init parser and connect
|
||||||
connect(this)
|
connect(this)
|
||||||
this.on('newListener', function (event) {
|
this.on('newListener', function (event) {
|
||||||
if ((event === 'messageBuffer' || event === 'pmessageBuffer') && !this.buffers && !this.messageBuffers) {
|
if ((event === 'messageBuffer' || event === 'pmessageBuffer') && this._messageBuffers === false) {
|
||||||
this.messageBuffers = true
|
this._messageBuffers = true
|
||||||
this._replyParser.setReturnBuffers(true)
|
if (this._parserReturningBuffers === false) {
|
||||||
|
this._parserReturningBuffers = true
|
||||||
|
this._replyParser.setReturnBuffers(true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@@ -76,14 +76,15 @@ function createParser (client) {
|
|||||||
function connect (client) {
|
function connect (client) {
|
||||||
// Init parser
|
// Init parser
|
||||||
const parser = createParser(client)
|
const parser = createParser(client)
|
||||||
|
const options = client._options
|
||||||
client._replyParser = parser
|
client._replyParser = parser
|
||||||
|
|
||||||
if (client._options.stream) {
|
if (options.stream) {
|
||||||
// Only add the listeners once in case of a reconnect try (that won't work)
|
// Only add the listeners once in case of a reconnect try (that won't work)
|
||||||
if (client._stream) {
|
if (client._stream) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
client._stream = client._options.stream
|
client._stream = options.stream
|
||||||
} else {
|
} else {
|
||||||
// On a reconnect destroy the former stream and retry
|
// On a reconnect destroy the former stream and retry
|
||||||
if (client._stream) {
|
if (client._stream) {
|
||||||
@@ -91,19 +92,8 @@ function connect (client) {
|
|||||||
client._stream.destroy()
|
client._stream.destroy()
|
||||||
}
|
}
|
||||||
|
|
||||||
if (client._options.tls) {
|
if (options.tls) {
|
||||||
client._stream = tls.connect(client._connectionOptions)
|
client._stream = tls.connect(client._connectionOptions)
|
||||||
|
|
||||||
// Whenever a handshake times out.
|
|
||||||
// Older Node.js versions use "clientError", newer versions use tlsClientError.
|
|
||||||
stream.once('clientError', (err) => {
|
|
||||||
debug('clientError occurred')
|
|
||||||
onStreamError(client, err)
|
|
||||||
})
|
|
||||||
stream.once('tlsClientError', (err) => {
|
|
||||||
debug('clientError occurred')
|
|
||||||
onStreamError(client, err)
|
|
||||||
})
|
|
||||||
} else {
|
} else {
|
||||||
client._stream = net.createConnection(client._connectionOptions)
|
client._stream = net.createConnection(client._connectionOptions)
|
||||||
}
|
}
|
||||||
@@ -111,18 +101,18 @@ function connect (client) {
|
|||||||
|
|
||||||
const stream = client._stream
|
const stream = client._stream
|
||||||
|
|
||||||
if (client._options.connectTimeout) {
|
if (options.connectTimeout) {
|
||||||
// TODO: Investigate why this is not properly triggered
|
// TODO: Investigate why this is not properly triggered
|
||||||
stream.setTimeout(client._options.connectTimeout, () => {
|
stream.setTimeout(client._connectTimeout, () => {
|
||||||
// Note: This is only tested if a internet connection is established
|
// Note: This is only tested if a internet connection is established
|
||||||
lazyReconnect(client, 'timeout')
|
lazyReconnect(client, 'timeout')
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
const connectEvent = client._options.tls ? 'secureConnect' : 'connect'
|
const connectEvent = options.tls ? 'secureConnect' : 'connect'
|
||||||
stream.once(connectEvent, () => {
|
stream.once(connectEvent, () => {
|
||||||
stream.removeAllListeners('timeout')
|
stream.removeAllListeners('timeout')
|
||||||
client.timesConnected++
|
client._timesConnected++
|
||||||
onConnect(client)
|
onConnect(client)
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -143,14 +133,27 @@ function connect (client) {
|
|||||||
lazyReconnect(client, 'end')
|
lazyReconnect(client, 'end')
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if (options.tls) {
|
||||||
|
// Whenever a handshake times out.
|
||||||
|
// Older Node.js versions use "clientError", newer versions use tlsClientError.
|
||||||
|
stream.once('clientError', (err) => {
|
||||||
|
debug('clientError occurred')
|
||||||
|
onStreamError(client, err)
|
||||||
|
})
|
||||||
|
stream.once('tlsClientError', (err) => {
|
||||||
|
debug('clientError occurred')
|
||||||
|
onStreamError(client, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
stream.setNoDelay()
|
stream.setNoDelay()
|
||||||
|
|
||||||
// Fire the command before redis is connected to be sure it's the first fired command.
|
// Fire the command before redis is connected to be sure it's the first fired command.
|
||||||
// TODO: Consider calling the ready check before Redis is connected as well.
|
// TODO: Consider calling the ready check before Redis is connected as well.
|
||||||
// That could improve the ready performance. Measure the rough time difference!
|
// That could improve the ready performance. Measure the rough time difference!
|
||||||
if (client._options.password !== undefined) {
|
if (options.password !== undefined) {
|
||||||
client.ready = true
|
client.ready = true
|
||||||
client.auth(client._options.password).catch((err) => {
|
client.auth(options.password).catch((err) => {
|
||||||
client._closing = true
|
client._closing = true
|
||||||
process.nextTick(() => {
|
process.nextTick(() => {
|
||||||
client.emit('error', err)
|
client.emit('error', err)
|
||||||
|
@@ -70,7 +70,7 @@ Multi.prototype.monitor = function monitor () {
|
|||||||
function quitCallback (client) {
|
function quitCallback (client) {
|
||||||
return function (err, res) {
|
return function (err, res) {
|
||||||
if (client._stream.writable) {
|
if (client._stream.writable) {
|
||||||
// If the socket is still alive, kill it. This could happen if quit got a NR_CLOSED error code
|
// If the socket is still alive, destroy it. This could happen if quit got a NR_CLOSED error code
|
||||||
client._stream.destroy()
|
client._stream.destroy()
|
||||||
}
|
}
|
||||||
if (err && err.code === 'NR_CLOSED') {
|
if (err && err.code === 'NR_CLOSED') {
|
||||||
|
@@ -51,16 +51,16 @@ function reconnect (client, why, error) {
|
|||||||
|
|
||||||
client.emit('end')
|
client.emit('end')
|
||||||
|
|
||||||
if (why === 'timeout') {
|
// if (why === 'timeout') {
|
||||||
var message = 'Redis connection in broken state: connection timeout exceeded.'
|
// var message = 'Redis connection in broken state: connection timeout exceeded.'
|
||||||
const err = new Errors.RedisError(message)
|
// const err = new Errors.RedisError(message)
|
||||||
// TODO: Find better error codes...
|
// // TODO: Find better error codes...
|
||||||
err.code = 'CONNECTION_BROKEN'
|
// err.code = 'CONNECTION_BROKEN'
|
||||||
flushAndError(client, message, 'CONNECTION_BROKEN')
|
// flushAndError(client, message, 'CONNECTION_BROKEN')
|
||||||
client.emit('error', err)
|
// client.emit('error', err)
|
||||||
client.end(false)
|
// client.end(false)
|
||||||
return
|
// return
|
||||||
}
|
// }
|
||||||
|
|
||||||
// If client is a requested shutdown, then don't retry
|
// If client is a requested shutdown, then don't retry
|
||||||
if (client._closing) {
|
if (client._closing) {
|
||||||
|
@@ -520,7 +520,7 @@ describe('The nodeRedis client', () => {
|
|||||||
|
|
||||||
client.on('error', (err) => {
|
client.on('error', (err) => {
|
||||||
console.log(err)
|
console.log(err)
|
||||||
if (err.code === 'CONNECTION_BROKEN') {
|
if (err.code === 'NR_CLOSED') {
|
||||||
assert(i, 3)
|
assert(i, 3)
|
||||||
assert.strictEqual(client.offlineQueue.length, 0)
|
assert.strictEqual(client.offlineQueue.length, 0)
|
||||||
assert.strictEqual(err.origin.code, 'ECONNREFUSED')
|
assert.strictEqual(err.origin.code, 'ECONNREFUSED')
|
||||||
|
@@ -458,21 +458,25 @@ describe('publish/subscribe', () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it('allows to listen to pmessageBuffer and pmessage', (done) => {
|
it('allows to listen to pmessageBuffer and pmessage', (done) => {
|
||||||
const end = helper.callFuncAfter(done, 4)
|
const end = helper.callFuncAfter(done, 5)
|
||||||
const data = Array(10000).join('äüs^öéÉÉ`e')
|
const data = Array(10000).join('äüs^öéÉÉ`e')
|
||||||
sub.set('foo', data).then(() => {
|
sub.set('foo', data).then(() => {
|
||||||
sub.get('foo').then((res) => assert.strictEqual(typeof res, 'string'))
|
sub.get('foo').then((res) => assert.strictEqual(typeof res, 'string'))
|
||||||
sub._stream.once('data', () => {
|
sub._stream.once('data', () => {
|
||||||
assert.strictEqual(sub.messageBuffers, false)
|
assert.strictEqual(sub._messageBuffers, false)
|
||||||
assert.strictEqual(sub.shouldBuffer, false)
|
assert.strictEqual(sub.shouldBuffer, false)
|
||||||
|
assert.strictEqual(args[2].detectBuffers, sub._parserReturningBuffers)
|
||||||
|
assert.strictEqual(sub._messageBuffers, false)
|
||||||
sub.on('messageBuffer', (channel, message, pattern) => {
|
sub.on('messageBuffer', (channel, message, pattern) => {
|
||||||
if (pattern) {
|
if (pattern) {
|
||||||
assert.strictEqual(pattern.inspect(), Buffer.from('*').inspect())
|
assert.strictEqual(pattern.inspect(), Buffer.from('*').inspect())
|
||||||
|
end()
|
||||||
}
|
}
|
||||||
assert.strictEqual(channel.inspect(), Buffer.from('/foo').inspect())
|
assert.strictEqual(channel.inspect(), Buffer.from('/foo').inspect())
|
||||||
end()
|
end()
|
||||||
})
|
})
|
||||||
assert.notStrictEqual(sub.messageBuffers, sub.buffers)
|
assert.strictEqual(sub._messageBuffers, sub._parserReturningBuffers)
|
||||||
|
assert.strictEqual(sub._messageBuffers, true)
|
||||||
})
|
})
|
||||||
const batch = sub.batch()
|
const batch = sub.batch()
|
||||||
batch.psubscribe('*')
|
batch.psubscribe('*')
|
||||||
|
Reference in New Issue
Block a user