diff --git a/changelog.md b/changelog.md index 03b2ab796e..9cdca6501e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,13 +1,16 @@ Changelog ========= -## v.2.x.x - xx, 2015 +## v.2.2.0 - xx, 2015 Features - Added disable_resubscribing option to prevent a client from resubscribing after reconnecting (@BridgeAR) - Added rename_commands options to handle renamed commands from the redis config (@digmxl & @BridgeAR) -- Increase performance by exchanging built in queue with [Petka Antonov's](@petkaantonov) [double-ended queue](https://github.com/petkaantonov/deque) and prevent polymorphism (@BridgeAR) +- Increased performance (@BridgeAR) + - exchanging built in queue with [Petka Antonov's](@petkaantonov) [double-ended queue](https://github.com/petkaantonov/deque) + - prevent polymorphism + - optimize statements Bugfixes diff --git a/index.js b/index.js index 3188cd42ad..b8300577de 100644 --- a/index.js +++ b/index.js @@ -55,6 +55,13 @@ function RedisClient(stream, options) { } } } + this.options.return_buffers = !!this.options.return_buffers; + this.options.detect_buffers = !!this.options.detect_buffers; + // Override the detect_buffers setting if return_buffers is active and print a warning + if (this.options.return_buffers && this.options.detect_buffers) { + console.warn('>> WARNING: You activated return_buffers and detect_buffers at the same time. The return value is always going to be a buffer.'); + this.options.detect_buffers = false; + } this.should_buffer = false; this.command_queue_high_water = options.command_queue_high_water || 1000; this.command_queue_low_water = options.command_queue_low_water || 0; @@ -433,7 +440,7 @@ RedisClient.prototype.send_offline_queue = function () { this.offline_queue = new Queue(); // Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue - if (!buffered_writes) { + if (buffered_writes === 0) { this.should_buffer = false; this.emit('drain'); } @@ -531,8 +538,18 @@ RedisClient.prototype.return_error = function (err) { err.code = match[1]; } + this.emit_drain_idle(queue_len); + + if (command_obj.callback) { + command_obj.callback(err); + } else { + this.emit('error', err); + } +}; + +RedisClient.prototype.emit_drain_idle = function (queue_len) { if (this.pub_sub_mode === false && queue_len === 0) { - this.command_queue = new Queue(); + this.command_queue.clear(); this.emit('idle'); } @@ -540,12 +557,6 @@ RedisClient.prototype.return_error = function (err) { this.emit('drain'); this.should_buffer = false; } - - if (command_obj.callback) { - command_obj.callback(err); - } else { - this.emit('error', err); - } }; RedisClient.prototype.return_reply = function (reply) { @@ -566,37 +577,29 @@ RedisClient.prototype.return_reply = function (reply) { queue_len = this.command_queue.length; - if (this.pub_sub_mode === false && queue_len === 0) { - this.command_queue = new Queue(); // explicitly reclaim storage from old Queue - this.emit('idle'); - } - if (this.should_buffer && queue_len <= this.command_queue_low_water) { - this.emit('drain'); - this.should_buffer = false; - } + this.emit_drain_idle(queue_len); if (command_obj && !command_obj.sub_command) { if (typeof command_obj.callback === 'function') { if ('exec' !== command_obj.command) { - if (this.options.detect_buffers && command_obj.buffer_args === false) { + if (command_obj.buffer_args === false) { // If detect_buffers option was specified, then the reply from the parser will be Buffers. // If this command did not use Buffer arguments, then convert the reply to Strings here. reply = utils.reply_to_strings(reply); } // TODO - confusing and error-prone that hgetall is special cased in two places - if (reply && 'hgetall' === command_obj.command) { + if ('hgetall' === command_obj.command) { reply = utils.reply_to_object(reply); } } - command_obj.callback(null, reply); } else { debug('No callback for reply'); } } else if (this.pub_sub_mode || command_obj && command_obj.sub_command) { if (Array.isArray(reply)) { - if (!this.options.return_buffers && (!command_obj || this.options.detect_buffers && command_obj.buffer_args === false)) { + if (!command_obj || command_obj.buffer_args === false) { reply = utils.reply_to_strings(reply); } type = reply[0].toString(); @@ -620,11 +623,9 @@ RedisClient.prototype.return_reply = function (reply) { this.emit(type, reply[1], reply[2]); // channel, count } else { this.emit('error', new Error('subscriptions are active but got unknown reply type ' + type)); - return; } } else if (!this.closing) { this.emit('error', new Error('subscriptions are active but got an invalid reply: ' + reply)); - return; } } /* istanbul ignore else: this is a safety check that we should not be able to trigger */ @@ -648,7 +649,12 @@ RedisClient.prototype.return_reply = function (reply) { }; RedisClient.prototype.send_command = function (command, args, callback) { - var arg, command_obj, i, elem_count, buffer_args, stream = this.stream, command_str = '', buffered_writes = 0, err; + var arg, command_obj, i, err, + stream = this.stream, + command_str = '', + buffered_writes = 0, + buffer_args = false, + buffer = this.options.return_buffers; if (args === undefined) { args = []; @@ -660,7 +666,7 @@ RedisClient.prototype.send_command = function (command, args, callback) { } } - if (process.domain && callback) { + if (callback && process.domain) { callback = process.domain.bind(callback); } @@ -678,15 +684,17 @@ RedisClient.prototype.send_command = function (command, args, callback) { } } - buffer_args = false; for (i = 0; i < args.length; i += 1) { if (Buffer.isBuffer(args[i])) { buffer_args = true; break; } } + if (this.options.detect_buffers) { + buffer = buffer_args; + } - command_obj = new Command(command, args, false, buffer_args, callback); + command_obj = new Command(command, args, false, buffer, callback); if (!this.ready && !this.send_anyway || !stream.writable) { if (this.closing || !this.enable_offline_queue) { @@ -725,8 +733,6 @@ RedisClient.prototype.send_command = function (command, args, callback) { this.command_queue.push(command_obj); this.commands_sent += 1; - elem_count = args.length + 1; - if (typeof this.options.rename_commands !== 'undefined' && this.options.rename_commands[command]) { command = this.options.rename_commands[command]; } @@ -734,7 +740,7 @@ RedisClient.prototype.send_command = function (command, args, callback) { // Always use 'Multi bulk commands', but if passed any Buffer args, then do multiple writes, one for each arg. // This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer. - command_str = '*' + elem_count + '\r\n$' + command.length + '\r\n' + command + '\r\n'; + command_str = '*' + (args.length + 1) + '\r\n$' + command.length + '\r\n' + command + '\r\n'; if (!buffer_args) { // Build up a string and send entire command in one write for (i = 0; i < args.length; i += 1) { @@ -752,10 +758,6 @@ RedisClient.prototype.send_command = function (command, args, callback) { for (i = 0; i < args.length; i += 1) { arg = args[i]; - if (!(Buffer.isBuffer(arg) || typeof arg === 'string')) { - arg = String(arg); - } - if (Buffer.isBuffer(arg)) { if (arg.length === 0) { debug('send_command: using empty string for 0 length buffer'); @@ -767,13 +769,16 @@ RedisClient.prototype.send_command = function (command, args, callback) { debug('send_command: buffer send ' + arg.length + ' bytes'); } } else { + if (typeof arg !== 'string') { + arg = String(arg); + } debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg); buffered_writes += !stream.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n'); } } } - debug('send_command buffered_writes: ' + buffered_writes, ' should_buffer: ' + this.should_buffer); - if (buffered_writes || this.command_queue.length >= this.command_queue_high_water) { + if (buffered_writes !== 0 || this.command_queue.length >= this.command_queue_high_water) { + debug('send_command buffered_writes: ' + buffered_writes, ' should_buffer: ' + this.should_buffer); this.should_buffer = true; } return !this.should_buffer; @@ -1077,7 +1082,7 @@ Multi.prototype.execute_callback = function (err, replies) { } replies[i].command = args[0].toUpperCase(); } else if (replies[i]) { - if (this._client.options.detect_buffers && this.wants_buffers[i + 1] === false) { + if (this.wants_buffers[i + 1] === false) { replies[i] = utils.reply_to_strings(replies[i]); } if (args[0] === 'hgetall') { diff --git a/lib/parsers/javascript.js b/lib/parsers/javascript.js index 1230401e4d..cfd89f20b6 100644 --- a/lib/parsers/javascript.js +++ b/lib/parsers/javascript.js @@ -2,9 +2,8 @@ var util = require('util'); -function ReplyParser(return_buffers) { +function ReplyParser() { this.name = exports.name; - this.return_buffers = return_buffers; this._buffer = new Buffer(0); this._offset = 0; @@ -37,10 +36,8 @@ ReplyParser.prototype._parseResult = function (type) { if (type === 45) { return new Error(this._buffer.toString(this._encoding, start, end)); - } else if (this.return_buffers) { - return this._buffer.slice(start, end); } - return this._buffer.toString(this._encoding, start, end); + return this._buffer.slice(start, end); } else if (type === 58) { // : // up to the delimiter end = this._packetEndOffset() - 1; @@ -77,15 +74,12 @@ ReplyParser.prototype._parseResult = function (type) { // set the offset to after the delimiter this._offset = end + 2; - if (this.return_buffers) { - return this._buffer.slice(start, end); - } - return this._buffer.toString(this._encoding, start, end); + return this._buffer.slice(start, end); } else if (type === 42) { // * offset = this._offset; packetHeader = this.parseHeader(); - if (packetHeader < 0) { + if (packetHeader === -1) { return null; } diff --git a/lib/utils.js b/lib/utils.js index 6f1661f1d3..3b282bc5ea 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,6 +1,6 @@ 'use strict'; -// hgetall converts its replies to an Object. If the reply is empty, null is returned. +// hgetall converts its replies to an Object. If the reply is empty, null is returned. function replyToObject(reply) { var obj = {}, j, jl, key, val; diff --git a/test/parser/javascript.spec.js b/test/parser/javascript.spec.js index 491a292ab1..499cfb11ca 100644 --- a/test/parser/javascript.spec.js +++ b/test/parser/javascript.spec.js @@ -3,13 +3,15 @@ var assert = require('assert'); var Parser = require("../../lib/parsers/javascript").Parser; var config = require("../lib/config"); +var utils = require("../../lib/utils"); var redis = config.redis; describe('javascript parser', function () { it('handles multi-bulk reply', function (done) { - var parser = new Parser(false); + var parser = new Parser(); var reply_count = 0; function check_reply(reply) { + reply = utils.reply_to_strings(reply); assert.deepEqual(reply, [['a']], "Expecting multi-bulk reply of [['a']]"); reply_count++; }