diff --git a/index.js b/index.js index ebe44d969a..2975d9cc58 100644 --- a/index.js +++ b/index.js @@ -39,18 +39,24 @@ function RedisClient(stream, options) { options = JSON.parse(JSON.stringify(options || {})); var self = this; + this.pipeline = 0; + var cork; if (!stream.cork) { - this.pipeline = 0; - this.cork = noop; - this.once('ready', function () { - self.cork = function (len) { - self.pipeline = len; - self.pipeline_queue = new Queue(len); - }; - }); - stream.uncork = noop; - this.write = this.writeStream; + cork = function (len) { + self.pipeline = len; + self.pipeline_queue = new Queue(len); + }; + this.uncork = noop; + } else { + cork = function (len) { + self.pipeline = len; + self.pipeline_queue = new Queue(len); + self.stream.cork(); + }; } + this.once('ready', function () { + self.cork = cork; + }); this.stream = stream; this.connection_id = ++connection_id; @@ -131,8 +137,9 @@ RedisClient.prototype.install_stream_listeners = function() { }); }; -RedisClient.prototype.cork = function (len) { - this.stream.cork(); +RedisClient.prototype.cork = noop; +RedisClient.prototype.uncork = function () { + this.stream.uncork(); }; RedisClient.prototype.initialize_retry_vars = function () { @@ -377,7 +384,6 @@ RedisClient.prototype.on_info_cmd = function (err, res) { return; } - var self = this; var obj = {}; var lines = res.toString().split('\r\n'); var i = 0; @@ -422,9 +428,9 @@ RedisClient.prototype.on_info_cmd = function (err, res) { retry_time = 1000; } debug('Redis server still loading, trying again in ' + retry_time); - setTimeout(function () { + setTimeout(function (self) { self.ready_check(); - }, retry_time); + }, retry_time, this); } }; @@ -441,12 +447,13 @@ RedisClient.prototype.ready_check = function () { }; RedisClient.prototype.send_offline_queue = function () { - var command_obj, buffered_writes = 0; + var command_obj; while (command_obj = this.offline_queue.shift()) { debug('Sending offline command: ' + command_obj.command); - buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback); + this.send_command(command_obj.command, command_obj.args, command_obj.callback); } + this.drain(); // Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue this.offline_queue = new Queue(); }; @@ -543,7 +550,7 @@ RedisClient.prototype.return_error = function (err) { err.code = match[1]; } - this.emit_drain_idle(queue_len); + this.emit_idle(queue_len); if (command_obj.callback) { command_obj.callback(err); @@ -557,16 +564,12 @@ RedisClient.prototype.drain = function () { this.should_buffer = false; }; -RedisClient.prototype.emit_drain_idle = function (queue_len) { +RedisClient.prototype.emit_idle = function (queue_len) { if (this.pub_sub_mode === false && queue_len === 0) { // Free the queue capacity memory by using a new queue this.command_queue = new Queue(); this.emit('idle'); } - - if (this.should_buffer && queue_len <= this.command_queue_low_water) { - this.drain(); - } }; RedisClient.prototype.return_reply = function (reply) { @@ -587,7 +590,7 @@ RedisClient.prototype.return_reply = function (reply) { queue_len = this.command_queue.length; - this.emit_drain_idle(queue_len); + this.emit_idle(queue_len); if (command_obj && !command_obj.sub_command) { if (typeof command_obj.callback === 'function') { @@ -640,7 +643,7 @@ RedisClient.prototype.return_reply = function (reply) { } /* istanbul ignore else: this is a safety check that we should not be able to trigger */ else if (this.monitoring) { - if (Buffer.isBuffer(reply)) { + if (typeof reply !== 'string') { reply = reply.toString(); } // If in monitoring mode only two commands are valid ones: AUTH and MONITOR wich reply with OK @@ -662,8 +665,8 @@ RedisClient.prototype.send_command = function (command, args, callback) { var arg, command_obj, i, err, stream = this.stream, command_str = '', - buffered_writes = 0, buffer_args = false, + big_data = false, buffer = this.options.return_buffers; if (args === undefined) { @@ -695,7 +698,12 @@ RedisClient.prototype.send_command = function (command, args, callback) { for (i = 0; i < args.length; i += 1) { if (Buffer.isBuffer(args[i])) { buffer_args = true; - break; + } else if (typeof args[i] !== 'string') { + arg = String(arg); + // 30000 seemed to be a good value to switch to buffers after testing this with and checking the pros and cons + } else if (args[i].length > 30000) { + big_data = true; + args[i] = new Buffer(args[i]); } } if (this.options.detect_buffers) { @@ -741,74 +749,53 @@ RedisClient.prototype.send_command = function (command, args, callback) { // Always use 'Multi bulk commands', but if passed any Buffer args, then do multiple writes, one for each arg. // This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer. - command_str = '*' + (args.length + 1) + '\r\n$' + command.length + '\r\n' + command + '\r\n'; - if (!buffer_args) { // Build up a string and send entire command in one write + if (!buffer_args && !big_data) { // Build up a string and send entire command in one write for (i = 0; i < args.length; i += 1) { - arg = args[i]; - if (typeof arg !== 'string') { - arg = String(arg); - } + arg = String(args[i]); command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n'; } debug('Send ' + this.address + ' id ' + this.connection_id + ': ' + command_str); - buffered_writes += !this.write(command_str); + this.write(command_str); } else { debug('Send command (' + command_str + ') has Buffer arguments'); - buffered_writes += !this.write(command_str); + this.write(command_str); for (i = 0; i < args.length; i += 1) { arg = args[i]; - if (Buffer.isBuffer(arg)) { - if (arg.length === 0) { - debug('send_command: using empty string for 0 length buffer'); - buffered_writes += !this.write('$0\r\n\r\n'); - } else { - buffered_writes += !this.write('$' + arg.length + '\r\n'); - buffered_writes += !this.write(arg); - buffered_writes += !this.write('\r\n'); - debug('send_command: buffer send ' + arg.length + ' bytes'); - } + if (!Buffer.isBuffer(arg)) { + arg = String(arg); + this.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n'); } else { - if (typeof arg !== 'string') { - arg = String(arg); - } - debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg); - buffered_writes += !this.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n'); + this.write('$' + arg.length + '\r\n'); + this.write(arg); + this.write('\r\n'); } + debug('send_command: buffer send ' + arg.length + ' bytes'); } } - if (buffered_writes !== 0 || this.command_queue.length >= this.command_queue_high_water) { - debug('send_command buffered_writes: ' + buffered_writes, ' should_buffer: ' + this.should_buffer); - this.should_buffer = true; - } return !this.should_buffer; }; RedisClient.prototype.write = function (data) { - return this.stream.write(data); -}; - -RedisClient.prototype.writeStream = function (data) { - var nr = 0; - if (this.pipeline === 0) { - return this.stream.write(data); + this.should_buffer = !this.stream.write(data); + return; } this.pipeline--; if (this.pipeline === 0) { - var command; + var command, str = ''; while (command = this.pipeline_queue.shift()) { - nr += !this.stream.write(command); + str += command; } - nr += !this.stream.write(data); - return !nr; + this.should_buffer = !this.stream.write(str + data); + return; } this.pipeline_queue.push(data); - return true; + return; }; RedisClient.prototype.pub_sub_command = function (command_obj) { @@ -1102,7 +1089,7 @@ Multi.prototype.exec_transaction = function (callback) { this.send_command(command, args, index, cb); } - this._client.stream.uncork(); + this._client.uncork(); return this._client.send_command('exec', [], function(err, replies) { self.execute_callback(err, replies); }); @@ -1210,7 +1197,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct this._client.send_command(command, args, cb); index++; } - this._client.stream.uncork(); + this._client.uncork(); return this._client.should_buffer; }; diff --git a/test/batch.spec.js b/test/batch.spec.js index 27b45ad3f0..9afc7973e9 100644 --- a/test/batch.spec.js +++ b/test/batch.spec.js @@ -162,7 +162,7 @@ describe("The 'batch' method", function () { }); }); - it('handles batchple operations being applied to a set', function (done) { + it('handles multiple operations being applied to a set', function (done) { client.sadd("some set", "mem 1"); client.sadd(["some set", "mem 2"]); client.sadd("some set", "mem 3"); @@ -189,7 +189,7 @@ describe("The 'batch' method", function () { }); }); - it('allows batchple operations to be performed using constructor with all kinds of syntax', function (done) { + it('allows multiple operations to be performed using constructor with all kinds of syntax', function (done) { var now = Date.now(); var arr = ["batchhmset", "batchbar", "batchbaz"]; var arr2 = ['some manner of key', 'otherTypes']; @@ -253,7 +253,7 @@ describe("The 'batch' method", function () { assert.strictEqual(buffering, true); }); - it('allows batchple operations to be performed using a chaining API', function (done) { + it('allows multiple operations to be performed using a chaining API', function (done) { client.batch() .mset('some', '10', 'keys', '20') .incr('some') @@ -270,7 +270,7 @@ describe("The 'batch' method", function () { }); }); - it('allows batchple commands to work the same as normal to be performed using a chaining API', function (done) { + it('allows multiple commands to work the same as normal to be performed using a chaining API', function (done) { client.batch() .mset(['some', '10', 'keys', '20']) .incr(['some', helper.isNumber(11)]) @@ -287,7 +287,7 @@ describe("The 'batch' method", function () { }); }); - it('allows batchple commands to work the same as normal to be performed using a chaining API promisified', function () { + it('allows multiple commands to work the same as normal to be performed using a chaining API promisified', function () { return client.batch() .mset(['some', '10', 'keys', '20']) .incr(['some', helper.isNumber(11)]) @@ -303,7 +303,7 @@ describe("The 'batch' method", function () { }); }); - it('allows an array to be provided indicating batchple operations to perform', function (done) { + it('allows an array to be provided indicating multiple operations to perform', function (done) { // test nested batch-bulk replies with nulls. client.batch([ ["mget", ["batchfoo", "some", "random value", "keys"]], diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index e89cddb3d3..84ef29b3e9 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -43,6 +43,22 @@ describe("The node_redis client", function () { }); }); + describe('big data', function () { + + // Check if the fast mode for big strings is working correct + it('safe strings that are bigger than 30000 characters', function(done) { + var str = 'foo ಠ_ಠ bar '; + while (str.length < 111111) { + str += str; + } + client.set('foo', str); + client.get('foo', function (err, res) { + assert.strictEqual(res, str); + done(); + }); + }); + }); + describe("send_command", function () { it("omitting args should be fine in some cases", function (done) { @@ -470,20 +486,6 @@ describe("The node_redis client", function () { describe('enable_offline_queue', function () { describe('true', function () { - it("should emit drain after info command and nothing to buffer", function (done) { - client = redis.createClient({ - parser: parser - }); - client.set('foo', 'bar'); - client.get('foo', function () { - assert(!client.should_buffer); - setTimeout(done, 25); - }); - client.on('drain', function() { - assert(client.offline_queue.length === 2); - }); - }); - it("should emit drain if offline queue is flushed and nothing to buffer", function (done) { client = redis.createClient({ parser: parser,