1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

chore: refactor main code into smaller parts

This commit is contained in:
Ruben Bridgewater
2017-05-27 03:41:27 +02:00
parent be3976e8ba
commit 2aa3b68fc6
9 changed files with 205 additions and 190 deletions

185
index.js
View File

@@ -1,12 +1,10 @@
'use strict' 'use strict'
// TODO: Replace all for in loops!
// TODO: Replace all `Error` with `RedisError` and improve errors in general // TODO: Replace all `Error` with `RedisError` and improve errors in general
// We have to replace the error codes and make them coherent. // We have to replace the error codes and make them coherent.
// We also have to use InterruptError s instead of AbortError s. // We also have to use InterruptError s instead of AbortError s.
// The Error messages might be improved as well. // The Error messages might be improved as well.
// TODO: Rewrite this to classes // TODO: Rewrite this to classes
const Buffer = require('buffer').Buffer
const net = require('net') const net = require('net')
const util = require('util') const util = require('util')
const utils = require('./lib/utils') const utils = require('./lib/utils')
@@ -22,12 +20,7 @@ const addCommand = require('./lib/commands')
const unifyOptions = require('./lib/createClient') const unifyOptions = require('./lib/createClient')
const Multi = require('./lib/multi') const Multi = require('./lib/multi')
const normalizeAndWriteCommand = require('./lib/writeCommands') const normalizeAndWriteCommand = require('./lib/writeCommands')
const SUBSCRIBE_COMMANDS = { const offlineCommand = require('./lib/offlineCommand')
subscribe: true,
unsubscribe: true,
psubscribe: true,
punsubscribe: true
}
function noop () {} function noop () {}
@@ -198,186 +191,12 @@ RedisClient.prototype.onError = function (err) {
reconnect(this, 'error', err) reconnect(this, 'error', err)
} }
RedisClient.prototype.sendOfflineQueue = function () {
for (var commandObj = this.offlineQueue.shift(); commandObj; commandObj = this.offlineQueue.shift()) {
debug('Sending offline command: %s', commandObj.command)
this.internalSendCommand(commandObj)
}
}
RedisClient.prototype.returnError = function (err) {
const commandObj = this.commandQueue.shift()
if (commandObj.error) {
err.stack = commandObj.error.stack.replace(/^Error.*?\n/, `ReplyError: ${err.message}\n`)
}
err.command = commandObj.command.toUpperCase()
if (commandObj.args && commandObj.args.length) {
err.args = commandObj.args
}
// Count down pub sub mode if in entering modus
if (this.pubSubMode > 1) {
this.pubSubMode--
}
const match = err.message.match(utils.errCode)
// LUA script could return user errors that don't behave like all other errors!
if (match) {
err.code = match[1]
}
commandObj.callback(err)
}
function normalReply (client, reply) {
const command = client.commandQueue.shift()
if (client._multi === false) {
reply = utils.handleReply(client, reply, command)
}
command.callback(null, reply)
}
function subscribeUnsubscribe (self, reply, type) {
// Subscribe commands take an optional callback and also emit an event, but only the Last_ response is included in the callback
// The pub sub commands return each argument in a separate return value and have to be handled that way
const commandObj = self.commandQueue.get(0)
const buffer = self.options.returnBuffers || self.options.detectBuffers && commandObj.bufferArgs
const channel = (buffer || reply[1] === null) ? reply[1] : reply[1].toString()
const count = +reply[2] // Return the channel counter as number no matter if `stringNumbers` is activated or not
debug(type, channel)
// Emit first, then return the callback
if (channel !== null) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from
if (type === 'subscribe' || type === 'psubscribe') {
self.subscriptionSet[`${type}_${channel}`] = channel
} else {
const innerType = type === 'unsubscribe' ? 'subscribe' : 'psubscribe' // Make types consistent
delete self.subscriptionSet[`${innerType}_${channel}`]
}
self.emit(type, channel, count)
self.subscribeChannels.push(channel)
}
if (commandObj.argsLength === 1 || self.subCommandsLeft === 1 || commandObj.argsLength === 0 && (count === 0 || channel === null)) {
if (count === 0) { // unsubscribed from all channels
var runningCommand
var i = 1
self.pubSubMode = 0 // Deactivating pub sub mode
// This should be a rare case and therefore handling it this way should be good performance wise for the general case
for (runningCommand = self.commandQueue.get(i); runningCommand !== undefined; runningCommand = self.commandQueue.get(i)) {
if (SUBSCRIBE_COMMANDS[runningCommand.command]) {
self.pubSubMode = i // Entering pub sub mode again
break
}
i++
}
}
self.commandQueue.shift()
commandObj.callback(null, [count, self.subscribeChannels])
self.subscribeChannels = []
self.subCommandsLeft = 0
} else {
if (self.subCommandsLeft !== 0) {
self.subCommandsLeft--
} else {
self.subCommandsLeft = commandObj.argsLength ? commandObj.argsLength - 1 : count
}
}
}
function returnPubSub (self, reply) {
const type = reply[0].toString()
if (type === 'message') { // channel, message
if (!self.options.returnBuffers || self.messageBuffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
self.emit('message', reply[1].toString(), reply[2].toString())
self.emit('messageBuffer', reply[1], reply[2])
} else {
self.emit('message', reply[1], reply[2])
}
} else if (type === 'pmessage') { // pattern, channel, message
if (!self.options.returnBuffers || self.messageBuffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
self.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3].toString())
self.emit('pmessageBuffer', reply[1], reply[2], reply[3])
} else {
self.emit('pmessage', reply[1], reply[2], reply[3])
}
} else {
subscribeUnsubscribe(self, reply, type)
}
}
RedisClient.prototype.returnReply = function (reply) {
// If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands
// As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve
// the average performance of all other commands in case of no monitor mode
if (this.monitoring) {
var replyStr
if (this.buffers && Buffer.isBuffer(reply)) {
replyStr = reply.toString()
} else {
replyStr = reply
}
// While reconnecting the redis server does not recognize the client as in monitor mode anymore
// Therefore the monitor command has to finish before it catches further commands
if (typeof replyStr === 'string' && utils.monitorRegex.test(replyStr)) {
const timestamp = replyStr.slice(0, replyStr.indexOf(' '))
const args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map((elem) => {
return elem.replace(/\\"/g, '"')
})
this.emit('monitor', timestamp, args, replyStr)
return
}
}
if (this.pubSubMode === 0) {
normalReply(this, reply)
} else if (this.pubSubMode !== 1) {
this.pubSubMode--
normalReply(this, reply)
} else if (!(reply instanceof Array) || reply.length <= 2) {
// Only PING and QUIT are allowed in this context besides the pub sub commands
// Ping replies with ['pong', null|value] and quit with 'OK'
normalReply(this, reply)
} else {
returnPubSub(this, reply)
}
}
function handleOfflineCommand (self, commandObj) {
var command = commandObj.command
var err, msg
if (self.closing || !self.enableOfflineQueue) {
command = command.toUpperCase()
if (!self.closing) {
if (self._stream.writable) {
msg = 'The connection is not yet established and the offline queue is deactivated.'
} else {
msg = 'Stream not writeable.'
}
} else {
msg = 'The connection is already closed.'
}
err = new errorClasses.AbortError({
message: `${command} can't be processed. ${msg}`,
code: 'NR_CLOSED',
command
})
if (commandObj.args.length) {
err.args = commandObj.args
}
utils.replyInOrder(self, commandObj.callback, err)
} else {
debug('Queueing %s for next server connection.', command)
self.offlineQueue.push(commandObj)
}
self.shouldBuffer = true
}
// Do not call internalSendCommand directly, if you are not absolutely certain it handles everything properly // Do not call internalSendCommand directly, if you are not absolutely certain it handles everything properly
// e.g. monitor / info does not work with internalSendCommand only // e.g. monitor / info does not work with internalSendCommand only
RedisClient.prototype.internalSendCommand = function (commandObj) { RedisClient.prototype.internalSendCommand = function (commandObj) {
if (this.ready === false || this._stream.writable === false) { if (this.ready === false || this._stream.writable === false) {
// Handle offline commands right away // Handle offline commands right away
handleOfflineCommand(this, commandObj) offlineCommand(this, commandObj)
return commandObj.promise return commandObj.promise
} }

View File

@@ -6,6 +6,9 @@ const net = require('net')
const reconnect = require('./reconnect') const reconnect = require('./reconnect')
const onConnect = require('./readyHandler') const onConnect = require('./readyHandler')
const debug = require('./debug') const debug = require('./debug')
const replyHandler = require('./replyHandler')
const onResult = replyHandler.onResult
const onError = replyHandler.onError
/** /**
* @description Create a new Parser instance and pass all the necessary options to it * @description Create a new Parser instance and pass all the necessary options to it
@@ -16,10 +19,10 @@ const debug = require('./debug')
function createParser (client) { function createParser (client) {
return new Parser({ return new Parser({
returnReply (data) { returnReply (data) {
client.returnReply(data) onResult(client, data)
}, },
returnError (err) { returnError (err) {
client.returnError(err) onError(client, err)
}, },
returnFatalError (err) { returnFatalError (err) {
// Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again // Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again

View File

@@ -40,9 +40,8 @@ module.exports = function createClient (portArg, hostArg, options) {
options.port = parsed.port options.port = parsed.port
} }
if (parsed.search !== '') { if (parsed.search !== '') {
var elem for (var elem in parsed.query) {
for (elem in parsed.query) { if (!Object.prototype.hasOwnProperty.call(parsed.query, elem)) {
if (!parsed.query.hasOwnProperty(elem)) {
continue continue
} }
// If options are passed twice, only the parsed options will be used // If options are passed twice, only the parsed options will be used

27
lib/offlineCommand.js Normal file
View File

@@ -0,0 +1,27 @@
'use strict'
const Errors = require('redis-errors')
const utils = require('./utils')
const debug = require('./debug')
function offlineCommand (client, command) {
const commandName = command.command.toUpperCase()
if (client.closing || !client.enableOfflineQueue) {
const msg = client.closing === true
? 'The connection is already closed.'
: client._stream.writable === true
? 'The connection is not yet established and the offline queue is deactivated.'
: 'Stream not writeable.'
const err = new Errors.AbortError(`${commandName} can't be processed. ${msg}`)
err.code = 'NR_CLOSED'
err.command = commandName
err.args = command.args
utils.replyInOrder(client, command.callback, err)
} else {
debug('Queueing %s for next server connection.', commandName)
client.offlineQueue.push(command)
}
client.shouldBuffer = true
}
module.exports = offlineCommand

80
lib/pubsub.js Normal file
View File

@@ -0,0 +1,80 @@
'use strict'
const debug = require('./debug')
const SUBSCRIBE_COMMANDS = {
subscribe: true,
unsubscribe: true,
psubscribe: true,
punsubscribe: true
}
function subscribeUnsubscribe (client, reply, type) {
// Subscribe commands take an optional callback and also emit an event, but only the Last_ response is included in the callback
// The pub sub commands return each argument in a separate return value and have to be handled that way
const commandObj = client.commandQueue.get(0)
const buffer = client.options.returnBuffers || client.options.detectBuffers && commandObj.bufferArgs
const channel = (buffer || reply[1] === null) ? reply[1] : reply[1].toString()
const count = +reply[2] // Return the channel counter as number no matter if `stringNumbers` is activated or not
debug(type, channel)
// Emit first, then return the callback
if (channel !== null) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from
if (type === 'subscribe' || type === 'psubscribe') {
client.subscriptionSet[`${type}_${channel}`] = channel
} else {
const innerType = type === 'unsubscribe' ? 'subscribe' : 'psubscribe' // Make types consistent
delete client.subscriptionSet[`${innerType}_${channel}`]
}
client.emit(type, channel, count)
client.subscribeChannels.push(channel)
}
if (commandObj.argsLength === 1 || client.subCommandsLeft === 1 || commandObj.argsLength === 0 && (count === 0 || channel === null)) {
if (count === 0) { // Unsubscribed from all channels
var runningCommand
var i = 1
client.pubSubMode = 0 // Deactivating pub sub mode
// This should be a rare case and therefore handling it this way should be good performance wise for the general case
for (runningCommand = client.commandQueue.get(i); runningCommand !== undefined; runningCommand = client.commandQueue.get(i)) {
if (SUBSCRIBE_COMMANDS[runningCommand.command]) {
client.pubSubMode = i // Entering pub sub mode again
break
}
i++
}
}
client.commandQueue.shift()
commandObj.callback(null, [count, client.subscribeChannels])
client.subscribeChannels = []
client.subCommandsLeft = 0
} else {
if (client.subCommandsLeft !== 0) {
client.subCommandsLeft--
} else {
client.subCommandsLeft = commandObj.argsLength ? commandObj.argsLength - 1 : count
}
}
}
function returnPubSub (client, reply) {
const type = reply[0].toString()
if (type === 'message') { // Channel, message
if (!client.options.returnBuffers || client.messageBuffers) { // Backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
client.emit('message', reply[1].toString(), reply[2].toString())
client.emit('messageBuffer', reply[1], reply[2])
} else {
client.emit('message', reply[1], reply[2])
}
} else if (type === 'pmessage') { // Pattern, channel, message
if (!client.options.returnBuffers || client.messageBuffers) { // Backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
client.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3].toString())
client.emit('pmessageBuffer', reply[1], reply[2], reply[3])
} else {
client.emit('pmessage', reply[1], reply[2], reply[3])
}
} else {
subscribeUnsubscribe(client, reply, type)
}
}
module.exports = returnPubSub

View File

@@ -22,6 +22,14 @@ function onConnect (client) {
} }
} }
function sendOfflineQueue (client) {
while (client.offlineQueue.length) {
const command = client.offlineQueue.shift()
debug('Sending offline command: %s', command.command)
client.internalSendCommand(command)
}
}
function onReady (client) { function onReady (client) {
debug('onReady called %s id %s', client.address, client.connectionId) debug('onReady called %s id %s', client.address, client.connectionId)
client.ready = true client.ready = true
@@ -83,7 +91,7 @@ function onReady (client) {
} }
} }
} }
client.sendOfflineQueue() sendOfflineQueue(client)
client.emit('ready') client.emit('ready')
} }

78
lib/replyHandler.js Normal file
View File

@@ -0,0 +1,78 @@
'use strict'
const Buffer = require('buffer').Buffer
const utils = require('./utils')
const pubsub = require('./pubsub')
function onError (client, err) {
const commandObj = client.commandQueue.shift()
if (commandObj.error) {
err.stack = commandObj.error.stack.replace(/^Error.*?\n/, `ReplyError: ${err.message}\n`)
}
err.command = commandObj.command.toUpperCase()
if (commandObj.args && commandObj.args.length) {
err.args = commandObj.args
}
// Count down pub sub mode if in entering modus
if (client.pubSubMode > 1) {
client.pubSubMode--
}
const match = err.message.match(utils.errCode)
// LUA script could return user errors that don't behave like all other errors!
if (match) {
err.code = match[1]
}
commandObj.callback(err)
}
function normalReply (client, reply) {
const command = client.commandQueue.shift()
if (client._multi === false) {
reply = utils.handleReply(client, reply, command)
}
command.callback(null, reply)
}
function onResult (client, reply) {
// If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands
// As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve
// the average performance of all other commands in case of no monitor mode
if (client.monitoring) {
var replyStr
if (client.buffers && Buffer.isBuffer(reply)) {
replyStr = reply.toString()
} else {
replyStr = reply
}
// While reconnecting the redis server does not recognize the client as in monitor mode anymore
// Therefore the monitor command has to finish before it catches further commands
if (typeof replyStr === 'string' && utils.monitorRegex.test(replyStr)) {
const timestamp = replyStr.slice(0, replyStr.indexOf(' '))
const args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map((elem) => {
return elem.replace(/\\"/g, '"')
})
client.emit('monitor', timestamp, args, replyStr)
return
}
}
if (client.pubSubMode === 0) {
normalReply(client, reply)
} else if (client.pubSubMode !== 1) {
client.pubSubMode--
normalReply(client, reply)
} else if (!(reply instanceof Array) || reply.length <= 2) {
// Only PING and QUIT are allowed in this context besides the pub sub commands
// Ping replies with ['pong', null|value] and quit with 'OK'
normalReply(client, reply)
} else {
pubsub(client, reply)
}
}
module.exports = {
onError,
onResult
}

View File

@@ -8,7 +8,7 @@ const intercept = require('intercept-stdout')
describe('The \'blpop\' method', () => { describe('The \'blpop\' method', () => {
helper.allTests((ip, args) => { helper.allTests((ip, args) => {
describe.only(`using ${ip}`, () => { describe(`using ${ip}`, () => {
let client let client
let bclient let bclient

View File

@@ -460,6 +460,7 @@ describe('publish/subscribe', () => {
sub.set('foo', data).then(() => { sub.set('foo', data).then(() => {
sub.get('foo') sub.get('foo')
sub._stream.once('data', () => { sub._stream.once('data', () => {
// TODO: Improve this test to test if a buffer is returned for any call
assert.strictEqual(sub.messageBuffers, false) assert.strictEqual(sub.messageBuffers, false)
assert.strictEqual(sub.shouldBuffer, false) assert.strictEqual(sub.shouldBuffer, false)
sub.on('pmessageBuffer', (pattern, channel, message) => { sub.on('pmessageBuffer', (pattern, channel, message) => {