1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00

Minor improvement for .batch and .multi for small values

Improve the speed by round about 5% for small values

Add Multi.exec_atomic
This commit is contained in:
Ruben Bridgewater
2015-10-10 22:58:58 +02:00
parent ed2fc95444
commit f0e28bf0f7
6 changed files with 216 additions and 135 deletions

100
index.js
View File

@@ -34,17 +34,19 @@ parsers.push(require('./lib/parsers/javascript'));
function RedisClient(stream, options) {
options = options || {};
var self = this;
this.pipeline = 0;
if (!stream.cork) {
stream.cork = function noop() {};
stream.cork = function noop() {
self.pipeline_queue = new Queue();
};
stream.uncork = function noop() {};
stream.__write = stream.write;
stream.write = this.writeStream.bind(this);
this.write = this.writeStream;
}
this.stream = stream;
this.options = options;
this.connection_id = ++connection_id;
this.connected = false;
this.ready = false;
@@ -73,14 +75,11 @@ function RedisClient(stream, options) {
this.command_queue_high_water = options.command_queue_high_water || 1000;
this.command_queue_low_water = options.command_queue_low_water || 0;
this.max_attempts = +options.max_attempts || 0;
this.command_queue = new Queue(); // holds sent commands to de-pipeline them
this.offline_queue = new Queue(); // holds commands issued but not able to be sent
this.command_queue = new Queue(); // Holds sent commands to de-pipeline them
this.offline_queue = new Queue(); // Holds commands issued but not able to be sent
this.commands_sent = 0;
this.connect_timeout = +options.connect_timeout || 86400000; // 24 * 60 * 60 * 1000 ms
this.enable_offline_queue = true;
if (options.enable_offline_queue === false) {
this.enable_offline_queue = false;
}
this.enable_offline_queue = options.enable_offline_queue === false ? false : true;
this.retry_max_delay = +options.retry_max_delay || null;
this.initialize_retry_vars();
this.pub_sub_mode = false;
@@ -90,10 +89,8 @@ function RedisClient(stream, options) {
this.server_info = {};
this.auth_pass = options.auth_pass;
this.parser_module = null;
this.selected_db = null; // save the selected db here, used when reconnecting
this.selected_db = null; // Save the selected db here, used when reconnecting
this.old_state = null;
this.pipeline = 0;
this.pipeline_queue = new Queue();
this.install_stream_listeners();
events.EventEmitter.call(this);
@@ -154,21 +151,19 @@ RedisClient.prototype.unref = function () {
// flush offline_queue and command_queue, erroring any items with a callback first
RedisClient.prototype.flush_and_error = function (error) {
var command_obj;
while (command_obj = this.offline_queue.shift()) {
if (typeof command_obj.callback === 'function') {
error.command = command_obj.command.toUpperCase();
command_obj.callback(error);
}
}
this.offline_queue = new Queue();
while (command_obj = this.command_queue.shift()) {
if (typeof command_obj.callback === 'function') {
error.command = command_obj.command.toUpperCase();
command_obj.callback(error);
}
}
this.offline_queue = new Queue();
this.command_queue = new Queue();
};
@@ -249,7 +244,6 @@ RedisClient.prototype.on_connect = function () {
this.connected = true;
this.ready = false;
this.connections += 1;
this.command_queue = new Queue();
this.emitted_end = false;
if (this.options.socket_nodelay) {
this.stream.setNoDelay();
@@ -446,8 +440,8 @@ RedisClient.prototype.send_offline_queue = function () {
debug('Sending offline command: ' + command_obj.command);
buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback);
}
this.offline_queue = new Queue();
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
this.offline_queue = new Queue();
if (buffered_writes === 0) {
this.should_buffer = false;
@@ -558,7 +552,8 @@ RedisClient.prototype.return_error = function (err) {
RedisClient.prototype.emit_drain_idle = function (queue_len) {
if (this.pub_sub_mode === false && queue_len === 0) {
this.command_queue.clear();
// Free the queue capacity memory by using a new queue
this.command_queue = new Queue();
this.emit('idle');
}
@@ -763,21 +758,21 @@ RedisClient.prototype.send_command = function (command, args, callback) {
command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n';
}
debug('Send ' + this.address + ' id ' + this.connection_id + ': ' + command_str);
buffered_writes += !stream.write(command_str);
buffered_writes += !this.write(command_str);
} else {
debug('Send command (' + command_str + ') has Buffer arguments');
buffered_writes += !stream.write(command_str);
buffered_writes += !this.write(command_str);
for (i = 0; i < args.length; i += 1) {
arg = args[i];
if (Buffer.isBuffer(arg)) {
if (arg.length === 0) {
debug('send_command: using empty string for 0 length buffer');
buffered_writes += !stream.write('$0\r\n\r\n');
buffered_writes += !this.write('$0\r\n\r\n');
} else {
buffered_writes += !stream.write('$' + arg.length + '\r\n');
buffered_writes += !stream.write(arg);
buffered_writes += !stream.write('\r\n');
buffered_writes += !this.write('$' + arg.length + '\r\n');
buffered_writes += !this.write(arg);
buffered_writes += !this.write('\r\n');
debug('send_command: buffer send ' + arg.length + ' bytes');
}
} else {
@@ -785,7 +780,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
arg = String(arg);
}
debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg);
buffered_writes += !stream.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
buffered_writes += !this.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
}
}
}
@@ -796,22 +791,28 @@ RedisClient.prototype.send_command = function (command, args, callback) {
return !this.should_buffer;
};
RedisClient.prototype.write = function (data) {
return this.stream.write(data);
};
RedisClient.prototype.writeStream = function (data) {
var nr = 0;
// Do not use a pipeline
if (this.pipeline === 0) {
return this.stream.__write(data);
return this.stream.write(data);
}
this.pipeline--;
this.pipeline_queue.push(data);
if (this.pipeline === 0) {
var len = this.pipeline_queue.length;
while (len--) {
nr += !this.stream.__write(this.pipeline_queue.shift());
var command;
while (command = this.pipeline_queue.shift()) {
nr += !this.stream.write(command);
}
nr += !this.stream.write(data);
return !nr;
}
this.pipeline_queue.push(data);
return true;
};
@@ -871,13 +872,11 @@ RedisClient.prototype.end = function (flush) {
function Multi(client, args, transaction) {
client.stream.cork();
this._client = client;
this.queue = [];
this.queue = new Queue();
if (transaction) {
this.exec = this.exec_transaction;
this.EXEC = this.exec_transaction;
this.queue.push(['multi']);
}
this._client.pipeline_queue.clear();
var command, tmp_args;
if (Array.isArray(args)) {
while (tmp_args = args.shift()) {
@@ -908,7 +907,7 @@ commands.forEach(function (fullCommand) {
return;
}
RedisClient.prototype[command] = function (key, arg, callback) {
RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command] = function (key, arg, callback) {
if (Array.isArray(key)) {
return this.send_command(command, key, arg);
}
@@ -926,9 +925,8 @@ commands.forEach(function (fullCommand) {
}
return this.send_command(command, utils.to_array(arguments));
};
RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command];
Multi.prototype[command] = function (key, arg, callback) {
Multi.prototype[command.toUpperCase()] = Multi.prototype[command] = function (key, arg, callback) {
if (Array.isArray(key)) {
if (arg) {
key = key.concat([arg]);
@@ -952,7 +950,6 @@ commands.forEach(function (fullCommand) {
}
return this;
};
Multi.prototype[command.toUpperCase()] = Multi.prototype[command];
});
// store db in this.select_db to restore it on reconnect
@@ -1061,23 +1058,31 @@ Multi.prototype.send_command = function (command, args, index, cb) {
if (cb) {
cb(err);
}
err.position = index - 1;
err.position = index;
self.errors.push(err);
}
});
};
Multi.prototype.exec_atomic = function (callback) {
if (this.queue.length < 2) {
return this.exec_batch(callback);
}
return this.exec(callback);
};
Multi.prototype.exec_transaction = function (callback) {
var self = this;
var len = this.queue.length;
var cb;
this.errors = [];
this.callback = callback;
this._client.pipeline = len;
this._client.pipeline = len + 2;
this.wants_buffers = new Array(len);
this.send_command('multi', []);
// drain queue, callback will catch 'QUEUED' or error
for (var index = 0; index < len; index++) {
var args = this.queue[index].slice(0);
var args = this.queue.get(index).slice(0);
var command = args.shift();
if (typeof args[args.length - 1] === 'function') {
cb = args.pop();
@@ -1108,7 +1113,7 @@ Multi.prototype.exec_transaction = function (callback) {
};
Multi.prototype.execute_callback = function (err, replies) {
var i, args;
var i = 0, args;
if (err) {
if (err.code !== 'CONNECTION_BROKEN') {
@@ -1124,9 +1129,7 @@ Multi.prototype.execute_callback = function (err, replies) {
}
if (replies) {
for (i = 0; i < this.queue.length - 1; i += 1) {
args = this.queue[i + 1];
while (args = this.queue.shift()) {
// If we asked for strings, even in detect_buffers mode, then return strings:
if (replies[i] instanceof Error) {
var match = replies[i].message.match(utils.errCode);
@@ -1136,7 +1139,7 @@ Multi.prototype.execute_callback = function (err, replies) {
}
replies[i].command = args[0].toUpperCase();
} else if (replies[i]) {
if (this.wants_buffers[i + 1] === false) {
if (this.wants_buffers[i] === false) {
replies[i] = utils.reply_to_strings(replies[i]);
}
if (args[0] === 'hgetall') {
@@ -1152,6 +1155,7 @@ Multi.prototype.execute_callback = function (err, replies) {
args[args.length - 1](null, replies[i]);
}
}
i++;
}
}
@@ -1176,7 +1180,7 @@ Multi.prototype.callback = function (cb, command, i) {
};
};
Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function (callback) {
var len = this.queue.length;
var self = this;
var index = 0;