diff --git a/index.js b/index.js index 2a94473178..f1e1b12da3 100644 --- a/index.js +++ b/index.js @@ -1,12 +1,10 @@ 'use strict' -// TODO: Replace all for in loops! // TODO: Replace all `Error` with `RedisError` and improve errors in general // We have to replace the error codes and make them coherent. // We also have to use InterruptError s instead of AbortError s. // The Error messages might be improved as well. // TODO: Rewrite this to classes -const Buffer = require('buffer').Buffer const net = require('net') const util = require('util') const utils = require('./lib/utils') @@ -22,12 +20,7 @@ const addCommand = require('./lib/commands') const unifyOptions = require('./lib/createClient') const Multi = require('./lib/multi') const normalizeAndWriteCommand = require('./lib/writeCommands') -const SUBSCRIBE_COMMANDS = { - subscribe: true, - unsubscribe: true, - psubscribe: true, - punsubscribe: true -} +const offlineCommand = require('./lib/offlineCommand') function noop () {} @@ -198,186 +191,12 @@ RedisClient.prototype.onError = function (err) { reconnect(this, 'error', err) } -RedisClient.prototype.sendOfflineQueue = function () { - for (var commandObj = this.offlineQueue.shift(); commandObj; commandObj = this.offlineQueue.shift()) { - debug('Sending offline command: %s', commandObj.command) - this.internalSendCommand(commandObj) - } -} - -RedisClient.prototype.returnError = function (err) { - const commandObj = this.commandQueue.shift() - if (commandObj.error) { - err.stack = commandObj.error.stack.replace(/^Error.*?\n/, `ReplyError: ${err.message}\n`) - } - err.command = commandObj.command.toUpperCase() - if (commandObj.args && commandObj.args.length) { - err.args = commandObj.args - } - - // Count down pub sub mode if in entering modus - if (this.pubSubMode > 1) { - this.pubSubMode-- - } - - const match = err.message.match(utils.errCode) - // LUA script could return user errors that don't behave like all other errors! - if (match) { - err.code = match[1] - } - - commandObj.callback(err) -} - -function normalReply (client, reply) { - const command = client.commandQueue.shift() - if (client._multi === false) { - reply = utils.handleReply(client, reply, command) - } - command.callback(null, reply) -} - -function subscribeUnsubscribe (self, reply, type) { - // Subscribe commands take an optional callback and also emit an event, but only the Last_ response is included in the callback - // The pub sub commands return each argument in a separate return value and have to be handled that way - const commandObj = self.commandQueue.get(0) - const buffer = self.options.returnBuffers || self.options.detectBuffers && commandObj.bufferArgs - const channel = (buffer || reply[1] === null) ? reply[1] : reply[1].toString() - const count = +reply[2] // Return the channel counter as number no matter if `stringNumbers` is activated or not - debug(type, channel) - - // Emit first, then return the callback - if (channel !== null) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from - if (type === 'subscribe' || type === 'psubscribe') { - self.subscriptionSet[`${type}_${channel}`] = channel - } else { - const innerType = type === 'unsubscribe' ? 'subscribe' : 'psubscribe' // Make types consistent - delete self.subscriptionSet[`${innerType}_${channel}`] - } - self.emit(type, channel, count) - self.subscribeChannels.push(channel) - } - - if (commandObj.argsLength === 1 || self.subCommandsLeft === 1 || commandObj.argsLength === 0 && (count === 0 || channel === null)) { - if (count === 0) { // unsubscribed from all channels - var runningCommand - var i = 1 - self.pubSubMode = 0 // Deactivating pub sub mode - // This should be a rare case and therefore handling it this way should be good performance wise for the general case - for (runningCommand = self.commandQueue.get(i); runningCommand !== undefined; runningCommand = self.commandQueue.get(i)) { - if (SUBSCRIBE_COMMANDS[runningCommand.command]) { - self.pubSubMode = i // Entering pub sub mode again - break - } - i++ - } - } - self.commandQueue.shift() - commandObj.callback(null, [count, self.subscribeChannels]) - self.subscribeChannels = [] - self.subCommandsLeft = 0 - } else { - if (self.subCommandsLeft !== 0) { - self.subCommandsLeft-- - } else { - self.subCommandsLeft = commandObj.argsLength ? commandObj.argsLength - 1 : count - } - } -} - -function returnPubSub (self, reply) { - const type = reply[0].toString() - if (type === 'message') { // channel, message - if (!self.options.returnBuffers || self.messageBuffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter - self.emit('message', reply[1].toString(), reply[2].toString()) - self.emit('messageBuffer', reply[1], reply[2]) - } else { - self.emit('message', reply[1], reply[2]) - } - } else if (type === 'pmessage') { // pattern, channel, message - if (!self.options.returnBuffers || self.messageBuffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter - self.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3].toString()) - self.emit('pmessageBuffer', reply[1], reply[2], reply[3]) - } else { - self.emit('pmessage', reply[1], reply[2], reply[3]) - } - } else { - subscribeUnsubscribe(self, reply, type) - } -} - -RedisClient.prototype.returnReply = function (reply) { - // If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands - // As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve - // the average performance of all other commands in case of no monitor mode - if (this.monitoring) { - var replyStr - if (this.buffers && Buffer.isBuffer(reply)) { - replyStr = reply.toString() - } else { - replyStr = reply - } - // While reconnecting the redis server does not recognize the client as in monitor mode anymore - // Therefore the monitor command has to finish before it catches further commands - if (typeof replyStr === 'string' && utils.monitorRegex.test(replyStr)) { - const timestamp = replyStr.slice(0, replyStr.indexOf(' ')) - const args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map((elem) => { - return elem.replace(/\\"/g, '"') - }) - this.emit('monitor', timestamp, args, replyStr) - return - } - } - if (this.pubSubMode === 0) { - normalReply(this, reply) - } else if (this.pubSubMode !== 1) { - this.pubSubMode-- - normalReply(this, reply) - } else if (!(reply instanceof Array) || reply.length <= 2) { - // Only PING and QUIT are allowed in this context besides the pub sub commands - // Ping replies with ['pong', null|value] and quit with 'OK' - normalReply(this, reply) - } else { - returnPubSub(this, reply) - } -} - -function handleOfflineCommand (self, commandObj) { - var command = commandObj.command - var err, msg - if (self.closing || !self.enableOfflineQueue) { - command = command.toUpperCase() - if (!self.closing) { - if (self._stream.writable) { - msg = 'The connection is not yet established and the offline queue is deactivated.' - } else { - msg = 'Stream not writeable.' - } - } else { - msg = 'The connection is already closed.' - } - err = new errorClasses.AbortError({ - message: `${command} can't be processed. ${msg}`, - code: 'NR_CLOSED', - command - }) - if (commandObj.args.length) { - err.args = commandObj.args - } - utils.replyInOrder(self, commandObj.callback, err) - } else { - debug('Queueing %s for next server connection.', command) - self.offlineQueue.push(commandObj) - } - self.shouldBuffer = true -} - // Do not call internalSendCommand directly, if you are not absolutely certain it handles everything properly // e.g. monitor / info does not work with internalSendCommand only RedisClient.prototype.internalSendCommand = function (commandObj) { if (this.ready === false || this._stream.writable === false) { // Handle offline commands right away - handleOfflineCommand(this, commandObj) + offlineCommand(this, commandObj) return commandObj.promise } diff --git a/lib/connect.js b/lib/connect.js index 71d5209163..3a47f94a61 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -6,6 +6,9 @@ const net = require('net') const reconnect = require('./reconnect') const onConnect = require('./readyHandler') const debug = require('./debug') +const replyHandler = require('./replyHandler') +const onResult = replyHandler.onResult +const onError = replyHandler.onError /** * @description Create a new Parser instance and pass all the necessary options to it @@ -16,10 +19,10 @@ const debug = require('./debug') function createParser (client) { return new Parser({ returnReply (data) { - client.returnReply(data) + onResult(client, data) }, returnError (err) { - client.returnError(err) + onError(client, err) }, returnFatalError (err) { // Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again diff --git a/lib/createClient.js b/lib/createClient.js index dd16e95b57..ccf1f95781 100644 --- a/lib/createClient.js +++ b/lib/createClient.js @@ -40,9 +40,8 @@ module.exports = function createClient (portArg, hostArg, options) { options.port = parsed.port } if (parsed.search !== '') { - var elem - for (elem in parsed.query) { - if (!parsed.query.hasOwnProperty(elem)) { + for (var elem in parsed.query) { + if (!Object.prototype.hasOwnProperty.call(parsed.query, elem)) { continue } // If options are passed twice, only the parsed options will be used diff --git a/lib/offlineCommand.js b/lib/offlineCommand.js new file mode 100644 index 0000000000..8893d04097 --- /dev/null +++ b/lib/offlineCommand.js @@ -0,0 +1,27 @@ +'use strict' + +const Errors = require('redis-errors') +const utils = require('./utils') +const debug = require('./debug') + +function offlineCommand (client, command) { + const commandName = command.command.toUpperCase() + if (client.closing || !client.enableOfflineQueue) { + const msg = client.closing === true + ? 'The connection is already closed.' + : client._stream.writable === true + ? 'The connection is not yet established and the offline queue is deactivated.' + : 'Stream not writeable.' + const err = new Errors.AbortError(`${commandName} can't be processed. ${msg}`) + err.code = 'NR_CLOSED' + err.command = commandName + err.args = command.args + utils.replyInOrder(client, command.callback, err) + } else { + debug('Queueing %s for next server connection.', commandName) + client.offlineQueue.push(command) + } + client.shouldBuffer = true +} + +module.exports = offlineCommand diff --git a/lib/pubsub.js b/lib/pubsub.js new file mode 100644 index 0000000000..2b4d3ee74b --- /dev/null +++ b/lib/pubsub.js @@ -0,0 +1,80 @@ +'use strict' + +const debug = require('./debug') +const SUBSCRIBE_COMMANDS = { + subscribe: true, + unsubscribe: true, + psubscribe: true, + punsubscribe: true +} + +function subscribeUnsubscribe (client, reply, type) { + // Subscribe commands take an optional callback and also emit an event, but only the Last_ response is included in the callback + // The pub sub commands return each argument in a separate return value and have to be handled that way + const commandObj = client.commandQueue.get(0) + const buffer = client.options.returnBuffers || client.options.detectBuffers && commandObj.bufferArgs + const channel = (buffer || reply[1] === null) ? reply[1] : reply[1].toString() + const count = +reply[2] // Return the channel counter as number no matter if `stringNumbers` is activated or not + debug(type, channel) + + // Emit first, then return the callback + if (channel !== null) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from + if (type === 'subscribe' || type === 'psubscribe') { + client.subscriptionSet[`${type}_${channel}`] = channel + } else { + const innerType = type === 'unsubscribe' ? 'subscribe' : 'psubscribe' // Make types consistent + delete client.subscriptionSet[`${innerType}_${channel}`] + } + client.emit(type, channel, count) + client.subscribeChannels.push(channel) + } + + if (commandObj.argsLength === 1 || client.subCommandsLeft === 1 || commandObj.argsLength === 0 && (count === 0 || channel === null)) { + if (count === 0) { // Unsubscribed from all channels + var runningCommand + var i = 1 + client.pubSubMode = 0 // Deactivating pub sub mode + // This should be a rare case and therefore handling it this way should be good performance wise for the general case + for (runningCommand = client.commandQueue.get(i); runningCommand !== undefined; runningCommand = client.commandQueue.get(i)) { + if (SUBSCRIBE_COMMANDS[runningCommand.command]) { + client.pubSubMode = i // Entering pub sub mode again + break + } + i++ + } + } + client.commandQueue.shift() + commandObj.callback(null, [count, client.subscribeChannels]) + client.subscribeChannels = [] + client.subCommandsLeft = 0 + } else { + if (client.subCommandsLeft !== 0) { + client.subCommandsLeft-- + } else { + client.subCommandsLeft = commandObj.argsLength ? commandObj.argsLength - 1 : count + } + } +} + +function returnPubSub (client, reply) { + const type = reply[0].toString() + if (type === 'message') { // Channel, message + if (!client.options.returnBuffers || client.messageBuffers) { // Backwards compatible. Refactor this in v.3 to always return a string on the normal emitter + client.emit('message', reply[1].toString(), reply[2].toString()) + client.emit('messageBuffer', reply[1], reply[2]) + } else { + client.emit('message', reply[1], reply[2]) + } + } else if (type === 'pmessage') { // Pattern, channel, message + if (!client.options.returnBuffers || client.messageBuffers) { // Backwards compatible. Refactor this in v.3 to always return a string on the normal emitter + client.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3].toString()) + client.emit('pmessageBuffer', reply[1], reply[2], reply[3]) + } else { + client.emit('pmessage', reply[1], reply[2], reply[3]) + } + } else { + subscribeUnsubscribe(client, reply, type) + } +} + +module.exports = returnPubSub diff --git a/lib/readyHandler.js b/lib/readyHandler.js index a5ec5b8226..8997575740 100644 --- a/lib/readyHandler.js +++ b/lib/readyHandler.js @@ -22,6 +22,14 @@ function onConnect (client) { } } +function sendOfflineQueue (client) { + while (client.offlineQueue.length) { + const command = client.offlineQueue.shift() + debug('Sending offline command: %s', command.command) + client.internalSendCommand(command) + } +} + function onReady (client) { debug('onReady called %s id %s', client.address, client.connectionId) client.ready = true @@ -83,7 +91,7 @@ function onReady (client) { } } } - client.sendOfflineQueue() + sendOfflineQueue(client) client.emit('ready') } diff --git a/lib/replyHandler.js b/lib/replyHandler.js new file mode 100644 index 0000000000..9fd5e77bb2 --- /dev/null +++ b/lib/replyHandler.js @@ -0,0 +1,78 @@ +'use strict' + +const Buffer = require('buffer').Buffer +const utils = require('./utils') +const pubsub = require('./pubsub') + +function onError (client, err) { + const commandObj = client.commandQueue.shift() + if (commandObj.error) { + err.stack = commandObj.error.stack.replace(/^Error.*?\n/, `ReplyError: ${err.message}\n`) + } + err.command = commandObj.command.toUpperCase() + if (commandObj.args && commandObj.args.length) { + err.args = commandObj.args + } + + // Count down pub sub mode if in entering modus + if (client.pubSubMode > 1) { + client.pubSubMode-- + } + + const match = err.message.match(utils.errCode) + // LUA script could return user errors that don't behave like all other errors! + if (match) { + err.code = match[1] + } + + commandObj.callback(err) +} + +function normalReply (client, reply) { + const command = client.commandQueue.shift() + if (client._multi === false) { + reply = utils.handleReply(client, reply, command) + } + command.callback(null, reply) +} + +function onResult (client, reply) { + // If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands + // As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve + // the average performance of all other commands in case of no monitor mode + if (client.monitoring) { + var replyStr + if (client.buffers && Buffer.isBuffer(reply)) { + replyStr = reply.toString() + } else { + replyStr = reply + } + // While reconnecting the redis server does not recognize the client as in monitor mode anymore + // Therefore the monitor command has to finish before it catches further commands + if (typeof replyStr === 'string' && utils.monitorRegex.test(replyStr)) { + const timestamp = replyStr.slice(0, replyStr.indexOf(' ')) + const args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map((elem) => { + return elem.replace(/\\"/g, '"') + }) + client.emit('monitor', timestamp, args, replyStr) + return + } + } + if (client.pubSubMode === 0) { + normalReply(client, reply) + } else if (client.pubSubMode !== 1) { + client.pubSubMode-- + normalReply(client, reply) + } else if (!(reply instanceof Array) || reply.length <= 2) { + // Only PING and QUIT are allowed in this context besides the pub sub commands + // Ping replies with ['pong', null|value] and quit with 'OK' + normalReply(client, reply) + } else { + pubsub(client, reply) + } +} + +module.exports = { + onError, + onResult +} diff --git a/test/commands/blpop.spec.js b/test/commands/blpop.spec.js index cfd75db617..542818911e 100644 --- a/test/commands/blpop.spec.js +++ b/test/commands/blpop.spec.js @@ -8,7 +8,7 @@ const intercept = require('intercept-stdout') describe('The \'blpop\' method', () => { helper.allTests((ip, args) => { - describe.only(`using ${ip}`, () => { + describe(`using ${ip}`, () => { let client let bclient diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 8d48776580..f96f959d8e 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -460,6 +460,7 @@ describe('publish/subscribe', () => { sub.set('foo', data).then(() => { sub.get('foo') sub._stream.once('data', () => { + // TODO: Improve this test to test if a buffer is returned for any call assert.strictEqual(sub.messageBuffers, false) assert.strictEqual(sub.shouldBuffer, false) sub.on('pmessageBuffer', (pattern, channel, message) => {