diff --git a/README.md b/README.md index c862017d0a..3f97f7df9b 100644 --- a/README.md +++ b/README.md @@ -403,6 +403,7 @@ channel name as `channel` and the new count of subscriptions for this client as `MULTI` commands are queued up until an `EXEC` is issued, and then all commands are run atomically by Redis. The interface in `node_redis` is to return an individual `Multi` object by calling `client.multi()`. +If any command fails to queue, all commands are rolled back and none is going to be executed (For further information look at [transactions](http://redis.io/topics/transactions)). ```js var redis = require("./index"), @@ -485,6 +486,12 @@ client.multi([ console.log(replies); }); ``` +## client.batch([commands]) + +`BATCH` commands are queued up until an `EXEC` is issued, and then all commands are run atomically by +Redis. The interface in `node_redis` is to return an individual `Batch` object by calling `client.batch()`. +The only difference between .batch and .multi is that no transaction is going to be used. +Be aware that the errors are - just like in multi statements - in the result. Otherwise both, errors and results could be returned at the same time. ## Monitor mode diff --git a/changelog.md b/changelog.md index be6f7a2b98..23efe9e276 100644 --- a/changelog.md +++ b/changelog.md @@ -1,7 +1,7 @@ Changelog ========= -## v.2.2.0 - xx, 2015 +## v.2.2.0 - 07, 2015 - The peregrino falcon Features @@ -11,6 +11,13 @@ Features - exchanging built in queue with [Petka Antonov's](@petkaantonov) [double-ended queue](https://github.com/petkaantonov/deque) - prevent polymorphism - optimize statements +- Added .batch command, similar to multi but without transaction (@BridgeAR) +- Improved pipelining to minimize the [RTT](http://redis.io/topics/pipelining) further (@BridgeAR) + +This release is mainly focusing on further speed improvements and we can proudly say that node_redis is very likely outperforming any other node redis client. + +If you do not rely on transactions but want to reduze the RTT you can use .batch from now on. It'll behave just the same as .multi but it does not have any transaction and therefor won't roll back any failed commands. +Both .multi and .batch are from now on going to fire the commands in bulk without doing any other operation in between. Bugfixes diff --git a/index.js b/index.js index 23fd75fc1e..1e1691071a 100644 --- a/index.js +++ b/index.js @@ -85,6 +85,8 @@ function RedisClient(stream, options) { this.parser_module = null; this.selected_db = null; // save the selected db here, used when reconnecting this.old_state = null; + this.pipeline = 0; + this.pipeline_queue = new Queue(); this.install_stream_listeners(); events.EventEmitter.call(this); @@ -648,6 +650,26 @@ RedisClient.prototype.return_reply = function (reply) { } }; +RedisClient.prototype.writeStream = function (data) { + var stream = this.stream; + var nr = 0; + + // Do not use a pipeline + if (this.pipeline === 0) { + return !stream.write(data); + } + this.pipeline--; + this.pipeline_queue.push(data); + if (this.pipeline === 0) { + var len = this.pipeline_queue.length; + while (len--) { + nr += !stream.write(this.pipeline_queue.shift()); + } + return !nr; + } + return true; +}; + RedisClient.prototype.send_command = function (command, args, callback) { var arg, command_obj, i, err, stream = this.stream, @@ -753,21 +775,21 @@ RedisClient.prototype.send_command = function (command, args, callback) { command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n'; } debug('Send ' + this.address + ' id ' + this.connection_id + ': ' + command_str); - buffered_writes += !stream.write(command_str); + buffered_writes += !this.writeStream(command_str); } else { debug('Send command (' + command_str + ') has Buffer arguments'); - buffered_writes += !stream.write(command_str); + buffered_writes += !this.writeStream(command_str); for (i = 0; i < args.length; i += 1) { arg = args[i]; if (Buffer.isBuffer(arg)) { if (arg.length === 0) { debug('send_command: using empty string for 0 length buffer'); - buffered_writes += !stream.write('$0\r\n\r\n'); + buffered_writes += !this.writeStream('$0\r\n\r\n'); } else { - buffered_writes += !stream.write('$' + arg.length + '\r\n'); - buffered_writes += !stream.write(arg); - buffered_writes += !stream.write('\r\n'); + buffered_writes += !this.writeStream('$' + arg.length + '\r\n'); + buffered_writes += !this.writeStream(arg); + buffered_writes += !this.writeStream('\r\n'); debug('send_command: buffer send ' + arg.length + ' bytes'); } } else { @@ -775,7 +797,7 @@ RedisClient.prototype.send_command = function (command, args, callback) { arg = String(arg); } debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg); - buffered_writes += !stream.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n'); + buffered_writes += !this.writeStream('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n'); } } } @@ -839,9 +861,15 @@ RedisClient.prototype.end = function (flush) { return this.stream.destroySoon(); }; -function Multi(client, args) { +function Multi(client, args, transaction) { this._client = client; - this.queue = [['multi']]; + this.queue = []; + if (transaction) { + this.exec = this.exec_transaction; + this.EXEC = this.exec_transaction; + this.queue.push(['multi']); + } + this._client.pipeline_queue.clear(); var command, tmp_args; if (Array.isArray(args)) { while (tmp_args = args.shift()) { @@ -857,7 +885,11 @@ function Multi(client, args) { } RedisClient.prototype.multi = RedisClient.prototype.MULTI = function (args) { - return new Multi(this, args); + return new Multi(this, args, true); +}; + +RedisClient.prototype.batch = RedisClient.prototype.BATCH = function (args) { + return new Multi(this, args, false); }; commands.forEach(function (fullCommand) { @@ -1025,25 +1057,35 @@ Multi.prototype.send_command = function (command, args, index, cb) { }); }; -Multi.prototype.exec = Multi.prototype.EXEC = function (callback) { +Multi.prototype.exec_transaction = function (callback) { var self = this; + var len = this.queue.length; + var cb; this.errors = []; this.callback = callback; - this.wants_buffers = new Array(this.queue.length); + this._client.pipeline = len; + this.wants_buffers = new Array(len); // drain queue, callback will catch 'QUEUED' or error - for (var index = 0; index < this.queue.length; index++) { + for (var index = 0; index < len; index++) { var args = this.queue[index].slice(0); var command = args.shift(); - var cb; if (typeof args[args.length - 1] === 'function') { cb = args.pop(); + } else { + cb = undefined; } // Keep track of who wants buffer responses: - this.wants_buffers[index] = false; - for (var i = 0; i < args.length; i += 1) { - if (Buffer.isBuffer(args[i])) { - this.wants_buffers[index] = true; - break; + if (this._client.options.return_buffers) { + this.wants_buffers[index] = true; + } else if (!this._client.options.detect_buffers) { + this.wants_buffers[index] = false; + } else { + this.wants_buffers[index] = false; + for (var i = 0; i < args.length; i += 1) { + if (Buffer.isBuffer(args[i])) { + this.wants_buffers[index] = true; + break; + } } } this.send_command(command, args, index, cb); @@ -1107,6 +1149,58 @@ Multi.prototype.execute_callback = function (err, replies) { } }; +Multi.prototype.callback = function (cb, command, i) { + var self = this; + return function (err, res) { + if (err) { + self.results[i] = err; + } else { + self.results[i] = res; + } + if (cb) { + cb(err, res); + } + // Do not emit an error here. Otherwise each error would result in one emit. + // The errors will be returned in the result anyway + }; +}; + +Multi.prototype.exec = Multi.prototype.EXEC = function (callback) { + var len = this.queue.length; + var self = this; + var index = 0; + var args; + if (len === 0) { + if (callback) { + callback(null, []); + } + return false; + } + this.results = new Array(len); + this._client.pipeline = len; + var lastCallback = function (cb) { + return function (err, res) { + cb(err, res); + callback(null, self.results); + }; + }; + while (args = this.queue.shift()) { + var command = args.shift(); + var cb; + if (typeof args[args.length - 1] === 'function') { + cb = this.callback(args.pop(), command, index); + } else { + cb = this.callback(undefined, command, index); + } + if (callback && index === len - 1) { + cb = lastCallback(cb); + } + this._client.send_command(command, args, cb); + index++; + } + return this._client.should_buffer; +}; + var createClient_unix = function (path, options){ var cnxOptions = { path: path diff --git a/test/commands/multi.spec.js b/test/commands/multi.spec.js index 89868133b0..99e43d472f 100644 --- a/test/commands/multi.spec.js +++ b/test/commands/multi.spec.js @@ -216,7 +216,7 @@ describe("The 'multi' method", function () { .mset('some', '10', 'keys', '20') .incr('some') .incr('keys') - .mget('some', 'keys') + .mget('some', ['keys']) .exec(function (err, replies) { assert.strictEqual(null, err); assert.equal('OK', replies[0]); diff --git a/test/detect_buffers.spec.js b/test/detect_buffers.spec.js index fc9cd1aa31..747253918c 100644 --- a/test/detect_buffers.spec.js +++ b/test/detect_buffers.spec.js @@ -79,6 +79,27 @@ describe("detect_buffers", function () { }); }); + describe('batch.hget', function () { + it('can interleave string and buffer results', function (done) { + client.batch() + .hget("hash key 2", "key 1") + .hget(new Buffer("hash key 2"), "key 1") + .hget("hash key 2", new Buffer("key 2")) + .hget("hash key 2", "key 2") + .exec(function (err, reply) { + assert.strictEqual(true, Array.isArray(reply)); + assert.strictEqual(4, reply.length); + assert.strictEqual("val 1", reply[0]); + assert.strictEqual(true, Buffer.isBuffer(reply[1])); + assert.strictEqual("", reply[1].inspect()); + assert.strictEqual(true, Buffer.isBuffer(reply[2])); + assert.strictEqual("", reply[2].inspect()); + assert.strictEqual("val 2", reply[3]); + return done(err); + }); + }); + }); + describe('hmget', function () { describe('first argument is a string', function () { it('returns strings for keys requested', function (done) { @@ -149,6 +170,19 @@ describe("detect_buffers", function () { return done(err); }); }); + + it("returns buffers for keys requested in .batch", function (done) { + client.batch().hmget(new Buffer("hash key 2"), "key 1", "key 2").exec(function (err, reply) { + assert.strictEqual(true, Array.isArray(reply)); + assert.strictEqual(1, reply.length); + assert.strictEqual(2, reply[0].length); + assert.strictEqual(true, Buffer.isBuffer(reply[0][0])); + assert.strictEqual(true, Buffer.isBuffer(reply[0][1])); + assert.strictEqual("", reply[0][0].inspect()); + assert.strictEqual("", reply[0][1].inspect()); + return done(err); + }); + }); }); }); @@ -174,6 +208,17 @@ describe("detect_buffers", function () { return done(err); }); }); + + it('returns string values when executed in .batch', function (done) { + client.batch().hgetall("hash key 2").exec(function (err, reply) { + assert.strictEqual(1, reply.length); + assert.strictEqual("object", typeof reply[0]); + assert.strictEqual(2, Object.keys(reply[0]).length); + assert.strictEqual("val 1", reply[0]["key 1"]); + assert.strictEqual("val 2", reply[0]["key 2"]); + return done(err); + }); + }); }); describe('first argument is a buffer', function () { @@ -193,7 +238,20 @@ describe("detect_buffers", function () { it('returns buffer values when executed in transaction', function (done) { client.multi().hgetall(new Buffer("hash key 2")).exec(function (err, reply) { assert.strictEqual(1, reply.length); - assert.strictEqual("object", typeof reply); + assert.strictEqual("object", typeof reply[0]); + assert.strictEqual(2, Object.keys(reply[0]).length); + assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 1"])); + assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 2"])); + assert.strictEqual("", reply[0]["key 1"].inspect()); + assert.strictEqual("", reply[0]["key 2"].inspect()); + return done(err); + }); + }); + + it('returns buffer values when executed in .batch', function (done) { + client.batch().hgetall(new Buffer("hash key 2")).exec(function (err, reply) { + assert.strictEqual(1, reply.length); + assert.strictEqual("object", typeof reply[0]); assert.strictEqual(2, Object.keys(reply[0]).length); assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 1"])); assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 2"]));