diff --git a/index.js b/index.js index e6e565c59b..ac2c0fc517 100644 --- a/index.js +++ b/index.js @@ -122,6 +122,7 @@ function RedisClient (options, stream) { } this.command_queue = new Queue(); // Holds sent commands to de-pipeline them this.offline_queue = new Queue(); // Holds commands issued but not able to be sent + this.pipeline_queue = new Queue(); // Holds all pipelined commands // ATTENTION: connect_timeout should change in v.3.0 so it does not count towards ending reconnection attempts after x seconds // This should be done by the retry_strategy. Instead it should only be the timeout for connecting to redis this.connect_timeout = +options.connect_timeout || 3600000; // 60 * 60 * 1000 ms @@ -144,8 +145,8 @@ function RedisClient (options, stream) { this.auth_pass = options.auth_pass || options.password; this.selected_db = options.db; // Save the selected db here, used when reconnecting this.old_state = null; - this.send_anyway = false; - this.pipeline = 0; + this.fire_strings = true; // Determine if strings or buffers should be written to the stream + this.pipeline = false; this.times_connected = 0; this.options = options; this.buffers = options.return_buffers || options.detect_buffers; @@ -374,23 +375,25 @@ RedisClient.prototype.on_ready = function () { debug('on_ready called ' + this.address + ' id ' + this.connection_id); this.ready = true; - var cork; - if (!this.stream.cork) { - cork = function (len) { - self.pipeline = len; - self.pipeline_queue = new Queue(len); - }; - } else { - cork = function (len) { - self.pipeline = len; - self.pipeline_queue = new Queue(len); + this.cork = function () { + self.pipeline = true; + if (self.stream.cork) { self.stream.cork(); - }; - this.uncork = function () { + } + }; + this.uncork = function () { + if (self.fire_strings) { + self.write_strings(); + } else { + self.write_buffers(); + } + self.pipeline = false; + self.fire_strings = true; + if (self.stream.uncork) { + // TODO: Consider using next tick here. See https://github.com/NodeRedis/node_redis/issues/1033 self.stream.uncork(); - }; - } - this.cork = cork; + } + }; // Restore modal commands from previous connection. The order of the commands is important if (this.selected_db !== undefined) { @@ -523,7 +526,8 @@ RedisClient.prototype.connection_gone = function (why, error) { this.ready = false; // Deactivate cork to work with the offline queue this.cork = noop; - this.pipeline = 0; + this.uncork = noop; + this.pipeline = false; var state = { monitoring: this.monitoring, @@ -792,10 +796,6 @@ RedisClient.prototype.internal_send_command = function (command, args, callback) if (args[i].length > 30000) { big_data = true; args_copy[i] = new Buffer(args[i], 'utf8'); - if (this.pipeline !== 0) { - this.pipeline += 2; - this.writeDefault = this.writeBuffers; - } } else { args_copy[i] = args[i]; } @@ -813,10 +813,6 @@ RedisClient.prototype.internal_send_command = function (command, args, callback) args_copy[i] = args[i]; buffer_args = true; big_data = true; - if (this.pipeline !== 0) { - this.pipeline += 2; - this.writeDefault = this.writeBuffers; - } } else { this.warn( 'Deprecated: The ' + command.toUpperCase() + ' command contains a argument of type ' + args[i].constructor.name + '.\n' + @@ -870,6 +866,7 @@ RedisClient.prototype.internal_send_command = function (command, args, callback) this.write(command_str); } else { debug('Send command (' + command_str + ') has Buffer arguments'); + this.fire_strings = false; this.write(command_str); for (i = 0; i < len; i += 1) { @@ -887,40 +884,33 @@ RedisClient.prototype.internal_send_command = function (command, args, callback) return !this.should_buffer; }; -RedisClient.prototype.writeDefault = RedisClient.prototype.writeStrings = function (data) { +RedisClient.prototype.write_strings = function () { var str = ''; for (var command = this.pipeline_queue.shift(); command; command = this.pipeline_queue.shift()) { // Write to stream if the string is bigger than 4mb. The biggest string may be Math.pow(2, 28) - 15 chars long if (str.length + command.length > 4 * 1024 * 1024) { - this.stream.write(str); + this.should_buffer = !this.stream.write(str); str = ''; } str += command; } - this.should_buffer = !this.stream.write(str + data); + if (str !== '') { + this.should_buffer = !this.stream.write(str); + } }; -RedisClient.prototype.writeBuffers = function (data) { +RedisClient.prototype.write_buffers = function () { for (var command = this.pipeline_queue.shift(); command; command = this.pipeline_queue.shift()) { - this.stream.write(command); + this.should_buffer = !this.stream.write(command); } - this.should_buffer = !this.stream.write(data); }; RedisClient.prototype.write = function (data) { - if (this.pipeline === 0) { + if (this.pipeline === false) { this.should_buffer = !this.stream.write(data); return; } - - this.pipeline--; - if (this.pipeline === 0) { - this.writeDefault(data); - return; - } - this.pipeline_queue.push(data); - return; }; Object.defineProperty(exports, 'debugMode', { diff --git a/lib/multi.js b/lib/multi.js index eaa6dd618e..bb2173cd2e 100644 --- a/lib/multi.js +++ b/lib/multi.js @@ -126,7 +126,7 @@ Multi.prototype.exec_transaction = function exec_transaction (callback) { var len = self.queue.length; self.errors = []; self.callback = callback; - self._client.cork(len + 2); + self._client.cork(); self.wants_buffers = new Array(len); pipeline_transaction_command(self, 'multi', []); // Drain queue, callback will catch 'QUEUED' or error @@ -151,7 +151,6 @@ Multi.prototype.exec_transaction = function exec_transaction (callback) { multi_callback(self, err, replies); }); self._client.uncork(); - self._client.writeDefault = self._client.writeStrings; return !self._client.should_buffer; }; @@ -198,7 +197,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct return true; } self.results = []; - self._client.cork(len); + self._client.cork(); while (args = self.queue.shift()) { var command = args[0]; var cb; @@ -213,9 +212,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct self._client.internal_send_command(command, args[1], cb); index++; } - self.queue = new Queue(); self._client.uncork(); - self._client.writeDefault = self._client.writeStrings; return !self._client.should_buffer; }; diff --git a/test/multi.spec.js b/test/multi.spec.js index a969bfbbd5..2ad9a23051 100644 --- a/test/multi.spec.js +++ b/test/multi.spec.js @@ -125,9 +125,8 @@ describe("The 'multi' method", function () { describe('when connected', function () { - beforeEach(function (done) { + beforeEach(function () { client = redis.createClient.apply(null, args); - client.once('connect', done); }); it('executes a pipelined multi properly in combination with the offline queue', function (done) { @@ -135,6 +134,7 @@ describe("The 'multi' method", function () { multi1.set('m1', '123'); multi1.get('m1'); multi1.exec(done); + assert.strictEqual(client.offline_queue.length, 4); }); it('executes a pipelined multi properly after a reconnect in combination with the offline queue', function (done) { @@ -612,11 +612,17 @@ describe("The 'multi' method", function () { }); it('emits error once if reconnecting after multi has been executed but not yet returned without callback', function (done) { + // NOTE: If uncork is called async by postponing it to the next tick, this behavior is going to change. + // The command won't be processed anymore two errors are returned instead of one client.on('error', function (err) { assert.strictEqual(err.code, 'UNCERTAIN_STATE'); - done(); + client.get('foo', function (err, res) { + assert.strictEqual(res, 'bar'); + done(); + }); }); + // The commands should still be fired, no matter that the socket is destroyed on the same tick client.multi().set('foo', 'bar').get('foo').exec(); // Abort connection before the value returned client.stream.destroy(); diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index 02a10f7c95..fdfc503d53 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -121,12 +121,12 @@ describe('The node_redis client', function () { str += str; } var called = false; - var temp = client.writeBuffers.bind(client); - assert(String(client.writeBuffers) !== String(client.writeDefault)); - client.writeBuffers = function (data) { + var temp = client.write_buffers.bind(client); + assert(client.fire_strings); + client.write_buffers = function (data) { called = true; // To increase write performance for strings the value is converted to a buffer - assert(String(client.writeBuffers) === String(client.writeDefault)); + assert(!client.fire_strings); temp(data); }; client.multi().set('foo', str).get('foo', function (err, res) { @@ -136,7 +136,7 @@ describe('The node_redis client', function () { assert.strictEqual(res[1], str); done(); }); - assert(String(client.writeBuffers) !== String(client.writeDefault)); + assert(client.fire_strings); }); });