1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Better pipelining

Add fallback mode
This commit is contained in:
Ruben Bridgewater
2015-10-08 02:29:07 +02:00
parent 9ee1e3c764
commit 7d2bb8edec
3 changed files with 105 additions and 91 deletions

View File

@@ -35,6 +35,13 @@ parsers.push(require('./lib/parsers/javascript'));
function RedisClient(stream, options) {
options = options || {};
if (!stream.cork) {
stream.cork = function noop() {};
stream.uncork = function noop() {};
stream.__write = stream.write;
stream.write = this.writeStream.bind(this);
}
this.stream = stream;
this.options = options;
@@ -650,26 +657,6 @@ RedisClient.prototype.return_reply = function (reply) {
}
};
RedisClient.prototype.writeStream = function (data) {
var stream = this.stream;
var nr = 0;
// Do not use a pipeline
if (this.pipeline === 0) {
return !stream.write(data);
}
this.pipeline--;
this.pipeline_queue.push(data);
if (this.pipeline === 0) {
var len = this.pipeline_queue.length;
while (len--) {
nr += !stream.write(this.pipeline_queue.shift());
}
return !nr;
}
return true;
};
RedisClient.prototype.send_command = function (command, args, callback) {
var arg, command_obj, i, err,
stream = this.stream,
@@ -775,21 +762,21 @@ RedisClient.prototype.send_command = function (command, args, callback) {
command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n';
}
debug('Send ' + this.address + ' id ' + this.connection_id + ': ' + command_str);
buffered_writes += !this.writeStream(command_str);
buffered_writes += !stream.write(command_str);
} else {
debug('Send command (' + command_str + ') has Buffer arguments');
buffered_writes += !this.writeStream(command_str);
buffered_writes += !stream.write(command_str);
for (i = 0; i < args.length; i += 1) {
arg = args[i];
if (Buffer.isBuffer(arg)) {
if (arg.length === 0) {
debug('send_command: using empty string for 0 length buffer');
buffered_writes += !this.writeStream('$0\r\n\r\n');
buffered_writes += !stream.write('$0\r\n\r\n');
} else {
buffered_writes += !this.writeStream('$' + arg.length + '\r\n');
buffered_writes += !this.writeStream(arg);
buffered_writes += !this.writeStream('\r\n');
buffered_writes += !stream.write('$' + arg.length + '\r\n');
buffered_writes += !stream.write(arg);
buffered_writes += !stream.write('\r\n');
debug('send_command: buffer send ' + arg.length + ' bytes');
}
} else {
@@ -797,7 +784,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
arg = String(arg);
}
debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg);
buffered_writes += !this.writeStream('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
buffered_writes += !stream.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
}
}
}
@@ -808,6 +795,25 @@ RedisClient.prototype.send_command = function (command, args, callback) {
return !this.should_buffer;
};
RedisClient.prototype.writeStream = function (data) {
var nr = 0;
// Do not use a pipeline
if (this.pipeline === 0) {
return !this.stream.__write(data);
}
this.pipeline--;
this.pipeline_queue.push(data);
if (this.pipeline === 0) {
var len = this.pipeline_queue.length;
while (len--) {
nr += !this.stream.__write(this.pipeline_queue.shift());
}
return !nr;
}
return true;
};
RedisClient.prototype.pub_sub_command = function (command_obj) {
var i, key, command, args;
@@ -862,6 +868,7 @@ RedisClient.prototype.end = function (flush) {
};
function Multi(client, args, transaction) {
client.stream.cork();
this._client = client;
this.queue = [];
if (transaction) {
@@ -1091,6 +1098,7 @@ Multi.prototype.exec_transaction = function (callback) {
this.send_command(command, args, index, cb);
}
this._client.stream.uncork();
return this._client.send_command('exec', [], function(err, replies) {
self.execute_callback(err, replies);
});
@@ -1198,6 +1206,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
this._client.send_command(command, args, cb);
index++;
}
this._client.stream.uncork();
return this._client.should_buffer;
};