1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00

Improve performance further

This commit is contained in:
Ruben Bridgewater
2015-10-05 10:57:57 +02:00
parent 2232a8948e
commit e47ba4a583
5 changed files with 54 additions and 50 deletions

View File

@@ -55,6 +55,13 @@ function RedisClient(stream, options) {
}
}
}
this.options.return_buffers = !!this.options.return_buffers;
this.options.detect_buffers = !!this.options.detect_buffers;
// Override the detect_buffers setting if return_buffers is active and print a warning
if (this.options.return_buffers && this.options.detect_buffers) {
console.warn('>> WARNING: You activated return_buffers and detect_buffers at the same time. The return value is always going to be a buffer.');
this.options.detect_buffers = false;
}
this.should_buffer = false;
this.command_queue_high_water = options.command_queue_high_water || 1000;
this.command_queue_low_water = options.command_queue_low_water || 0;
@@ -433,7 +440,7 @@ RedisClient.prototype.send_offline_queue = function () {
this.offline_queue = new Queue();
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
if (!buffered_writes) {
if (buffered_writes === 0) {
this.should_buffer = false;
this.emit('drain');
}
@@ -531,8 +538,18 @@ RedisClient.prototype.return_error = function (err) {
err.code = match[1];
}
this.emit_drain_idle(queue_len);
if (command_obj.callback) {
command_obj.callback(err);
} else {
this.emit('error', err);
}
};
RedisClient.prototype.emit_drain_idle = function (queue_len) {
if (this.pub_sub_mode === false && queue_len === 0) {
this.command_queue = new Queue();
this.command_queue.clear();
this.emit('idle');
}
@@ -540,12 +557,6 @@ RedisClient.prototype.return_error = function (err) {
this.emit('drain');
this.should_buffer = false;
}
if (command_obj.callback) {
command_obj.callback(err);
} else {
this.emit('error', err);
}
};
RedisClient.prototype.return_reply = function (reply) {
@@ -566,37 +577,29 @@ RedisClient.prototype.return_reply = function (reply) {
queue_len = this.command_queue.length;
if (this.pub_sub_mode === false && queue_len === 0) {
this.command_queue = new Queue(); // explicitly reclaim storage from old Queue
this.emit('idle');
}
if (this.should_buffer && queue_len <= this.command_queue_low_water) {
this.emit('drain');
this.should_buffer = false;
}
this.emit_drain_idle(queue_len);
if (command_obj && !command_obj.sub_command) {
if (typeof command_obj.callback === 'function') {
if ('exec' !== command_obj.command) {
if (this.options.detect_buffers && command_obj.buffer_args === false) {
if (command_obj.buffer_args === false) {
// If detect_buffers option was specified, then the reply from the parser will be Buffers.
// If this command did not use Buffer arguments, then convert the reply to Strings here.
reply = utils.reply_to_strings(reply);
}
// TODO - confusing and error-prone that hgetall is special cased in two places
if (reply && 'hgetall' === command_obj.command) {
if ('hgetall' === command_obj.command) {
reply = utils.reply_to_object(reply);
}
}
command_obj.callback(null, reply);
} else {
debug('No callback for reply');
}
} else if (this.pub_sub_mode || command_obj && command_obj.sub_command) {
if (Array.isArray(reply)) {
if (!this.options.return_buffers && (!command_obj || this.options.detect_buffers && command_obj.buffer_args === false)) {
if (!command_obj || command_obj.buffer_args === false) {
reply = utils.reply_to_strings(reply);
}
type = reply[0].toString();
@@ -620,11 +623,9 @@ RedisClient.prototype.return_reply = function (reply) {
this.emit(type, reply[1], reply[2]); // channel, count
} else {
this.emit('error', new Error('subscriptions are active but got unknown reply type ' + type));
return;
}
} else if (!this.closing) {
this.emit('error', new Error('subscriptions are active but got an invalid reply: ' + reply));
return;
}
}
/* istanbul ignore else: this is a safety check that we should not be able to trigger */
@@ -648,7 +649,12 @@ RedisClient.prototype.return_reply = function (reply) {
};
RedisClient.prototype.send_command = function (command, args, callback) {
var arg, command_obj, i, elem_count, buffer_args, stream = this.stream, command_str = '', buffered_writes = 0, err;
var arg, command_obj, i, err,
stream = this.stream,
command_str = '',
buffered_writes = 0,
buffer_args = false,
buffer = this.options.return_buffers;
if (args === undefined) {
args = [];
@@ -660,7 +666,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
}
}
if (process.domain && callback) {
if (callback && process.domain) {
callback = process.domain.bind(callback);
}
@@ -678,15 +684,17 @@ RedisClient.prototype.send_command = function (command, args, callback) {
}
}
buffer_args = false;
for (i = 0; i < args.length; i += 1) {
if (Buffer.isBuffer(args[i])) {
buffer_args = true;
break;
}
}
if (this.options.detect_buffers) {
buffer = buffer_args;
}
command_obj = new Command(command, args, false, buffer_args, callback);
command_obj = new Command(command, args, false, buffer, callback);
if (!this.ready && !this.send_anyway || !stream.writable) {
if (this.closing || !this.enable_offline_queue) {
@@ -725,8 +733,6 @@ RedisClient.prototype.send_command = function (command, args, callback) {
this.command_queue.push(command_obj);
this.commands_sent += 1;
elem_count = args.length + 1;
if (typeof this.options.rename_commands !== 'undefined' && this.options.rename_commands[command]) {
command = this.options.rename_commands[command];
}
@@ -734,7 +740,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
// Always use 'Multi bulk commands', but if passed any Buffer args, then do multiple writes, one for each arg.
// This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
command_str = '*' + elem_count + '\r\n$' + command.length + '\r\n' + command + '\r\n';
command_str = '*' + (args.length + 1) + '\r\n$' + command.length + '\r\n' + command + '\r\n';
if (!buffer_args) { // Build up a string and send entire command in one write
for (i = 0; i < args.length; i += 1) {
@@ -752,10 +758,6 @@ RedisClient.prototype.send_command = function (command, args, callback) {
for (i = 0; i < args.length; i += 1) {
arg = args[i];
if (!(Buffer.isBuffer(arg) || typeof arg === 'string')) {
arg = String(arg);
}
if (Buffer.isBuffer(arg)) {
if (arg.length === 0) {
debug('send_command: using empty string for 0 length buffer');
@@ -767,13 +769,16 @@ RedisClient.prototype.send_command = function (command, args, callback) {
debug('send_command: buffer send ' + arg.length + ' bytes');
}
} else {
if (typeof arg !== 'string') {
arg = String(arg);
}
debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg);
buffered_writes += !stream.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
}
}
}
debug('send_command buffered_writes: ' + buffered_writes, ' should_buffer: ' + this.should_buffer);
if (buffered_writes || this.command_queue.length >= this.command_queue_high_water) {
if (buffered_writes !== 0 || this.command_queue.length >= this.command_queue_high_water) {
debug('send_command buffered_writes: ' + buffered_writes, ' should_buffer: ' + this.should_buffer);
this.should_buffer = true;
}
return !this.should_buffer;
@@ -1077,7 +1082,7 @@ Multi.prototype.execute_callback = function (err, replies) {
}
replies[i].command = args[0].toUpperCase();
} else if (replies[i]) {
if (this._client.options.detect_buffers && this.wants_buffers[i + 1] === false) {
if (this.wants_buffers[i + 1] === false) {
replies[i] = utils.reply_to_strings(replies[i]);
}
if (args[0] === 'hgetall') {