1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Add .batch with better pipeline implementation

This commit is contained in:
Ruben Bridgewater
2015-10-05 23:30:21 +02:00
parent 146d88154c
commit f8c245e04f
5 changed files with 188 additions and 22 deletions

132
index.js
View File

@@ -85,6 +85,8 @@ function RedisClient(stream, options) {
this.parser_module = null;
this.selected_db = null; // save the selected db here, used when reconnecting
this.old_state = null;
this.pipeline = 0;
this.pipeline_queue = new Queue();
this.install_stream_listeners();
events.EventEmitter.call(this);
@@ -648,6 +650,26 @@ RedisClient.prototype.return_reply = function (reply) {
}
};
RedisClient.prototype.writeStream = function (data) {
var stream = this.stream;
var nr = 0;
// Do not use a pipeline
if (this.pipeline === 0) {
return !stream.write(data);
}
this.pipeline--;
this.pipeline_queue.push(data);
if (this.pipeline === 0) {
var len = this.pipeline_queue.length;
while (len--) {
nr += !stream.write(this.pipeline_queue.shift());
}
return !nr;
}
return true;
};
RedisClient.prototype.send_command = function (command, args, callback) {
var arg, command_obj, i, err,
stream = this.stream,
@@ -753,21 +775,21 @@ RedisClient.prototype.send_command = function (command, args, callback) {
command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n';
}
debug('Send ' + this.address + ' id ' + this.connection_id + ': ' + command_str);
buffered_writes += !stream.write(command_str);
buffered_writes += !this.writeStream(command_str);
} else {
debug('Send command (' + command_str + ') has Buffer arguments');
buffered_writes += !stream.write(command_str);
buffered_writes += !this.writeStream(command_str);
for (i = 0; i < args.length; i += 1) {
arg = args[i];
if (Buffer.isBuffer(arg)) {
if (arg.length === 0) {
debug('send_command: using empty string for 0 length buffer');
buffered_writes += !stream.write('$0\r\n\r\n');
buffered_writes += !this.writeStream('$0\r\n\r\n');
} else {
buffered_writes += !stream.write('$' + arg.length + '\r\n');
buffered_writes += !stream.write(arg);
buffered_writes += !stream.write('\r\n');
buffered_writes += !this.writeStream('$' + arg.length + '\r\n');
buffered_writes += !this.writeStream(arg);
buffered_writes += !this.writeStream('\r\n');
debug('send_command: buffer send ' + arg.length + ' bytes');
}
} else {
@@ -775,7 +797,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
arg = String(arg);
}
debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg);
buffered_writes += !stream.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
buffered_writes += !this.writeStream('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
}
}
}
@@ -839,9 +861,15 @@ RedisClient.prototype.end = function (flush) {
return this.stream.destroySoon();
};
function Multi(client, args) {
function Multi(client, args, transaction) {
this._client = client;
this.queue = [['multi']];
this.queue = [];
if (transaction) {
this.exec = this.exec_transaction;
this.EXEC = this.exec_transaction;
this.queue.push(['multi']);
}
this._client.pipeline_queue.clear();
var command, tmp_args;
if (Array.isArray(args)) {
while (tmp_args = args.shift()) {
@@ -857,7 +885,11 @@ function Multi(client, args) {
}
RedisClient.prototype.multi = RedisClient.prototype.MULTI = function (args) {
return new Multi(this, args);
return new Multi(this, args, true);
};
RedisClient.prototype.batch = RedisClient.prototype.BATCH = function (args) {
return new Multi(this, args, false);
};
commands.forEach(function (fullCommand) {
@@ -1025,25 +1057,35 @@ Multi.prototype.send_command = function (command, args, index, cb) {
});
};
Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
Multi.prototype.exec_transaction = function (callback) {
var self = this;
var len = this.queue.length;
var cb;
this.errors = [];
this.callback = callback;
this.wants_buffers = new Array(this.queue.length);
this._client.pipeline = len;
this.wants_buffers = new Array(len);
// drain queue, callback will catch 'QUEUED' or error
for (var index = 0; index < this.queue.length; index++) {
for (var index = 0; index < len; index++) {
var args = this.queue[index].slice(0);
var command = args.shift();
var cb;
if (typeof args[args.length - 1] === 'function') {
cb = args.pop();
} else {
cb = undefined;
}
// Keep track of who wants buffer responses:
this.wants_buffers[index] = false;
for (var i = 0; i < args.length; i += 1) {
if (Buffer.isBuffer(args[i])) {
this.wants_buffers[index] = true;
break;
if (this._client.options.return_buffers) {
this.wants_buffers[index] = true;
} else if (!this._client.options.detect_buffers) {
this.wants_buffers[index] = false;
} else {
this.wants_buffers[index] = false;
for (var i = 0; i < args.length; i += 1) {
if (Buffer.isBuffer(args[i])) {
this.wants_buffers[index] = true;
break;
}
}
}
this.send_command(command, args, index, cb);
@@ -1107,6 +1149,58 @@ Multi.prototype.execute_callback = function (err, replies) {
}
};
Multi.prototype.callback = function (cb, command, i) {
var self = this;
return function (err, res) {
if (err) {
self.results[i] = err;
} else {
self.results[i] = res;
}
if (cb) {
cb(err, res);
}
// Do not emit an error here. Otherwise each error would result in one emit.
// The errors will be returned in the result anyway
};
};
Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
var len = this.queue.length;
var self = this;
var index = 0;
var args;
if (len === 0) {
if (callback) {
callback(null, []);
}
return false;
}
this.results = new Array(len);
this._client.pipeline = len;
var lastCallback = function (cb) {
return function (err, res) {
cb(err, res);
callback(null, self.results);
};
};
while (args = this.queue.shift()) {
var command = args.shift();
var cb;
if (typeof args[args.length - 1] === 'function') {
cb = this.callback(args.pop(), command, index);
} else {
cb = this.callback(undefined, command, index);
}
if (callback && index === len - 1) {
cb = lastCallback(cb);
}
this._client.send_command(command, args, cb);
index++;
}
return this._client.should_buffer;
};
var createClient_unix = function (path, options){
var cnxOptions = {
path: path