1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Fix some minor issues and add more tests

Do not mutate the options object and add some more tests
This commit is contained in:
Ruben Bridgewater
2015-10-17 00:49:41 +02:00
parent 587b1c95a0
commit 2a65ee48dd
16 changed files with 633 additions and 405 deletions

128
index.js
View File

@@ -20,6 +20,8 @@ var debug = function(msg) {
}
};
function noop () {}
exports.debug_mode = /\bredis\b/i.test(process.env.NODE_DEBUG);
// hiredis might not be installed
@@ -33,24 +35,24 @@ try {
parsers.push(require('./lib/parsers/javascript'));
function RedisClient(stream, options) {
options = options || {};
// Copy the options so they are not mutated
options = JSON.parse(JSON.stringify(options));
var self = this;
if (!stream.cork) {
this.pipeline = 0;
this.cork = function noop (len) {};
this.cork = noop;
this.once('ready', function () {
self.cork = function (len) {
self.pipeline = len;
self.pipeline_queue = new Queue(len);
};
});
stream.uncork = function noop() {};
stream.uncork = noop;
this.write = this.writeStream;
}
this.stream = stream;
this.options = options;
this.connection_id = ++connection_id;
this.connected = false;
this.ready = false;
@@ -62,26 +64,23 @@ function RedisClient(stream, options) {
options.socket_keepalive = true;
}
if (options.rename_commands) {
for (var command in options.rename_commands) {
if (options.rename_commands.hasOwnProperty(command)) {
options.rename_commands[command.toLowerCase()] = options.rename_commands[command];
}
for (var command in options.rename_commands) { // jshint ignore: line
options.rename_commands[command.toLowerCase()] = options.rename_commands[command];
}
}
this.options.return_buffers = !!this.options.return_buffers;
this.options.detect_buffers = !!this.options.detect_buffers;
options.return_buffers = !!options.return_buffers;
options.detect_buffers = !!options.detect_buffers;
// Override the detect_buffers setting if return_buffers is active and print a warning
if (this.options.return_buffers && this.options.detect_buffers) {
if (options.return_buffers && options.detect_buffers) {
console.warn('>> WARNING: You activated return_buffers and detect_buffers at the same time. The return value is always going to be a buffer.');
this.options.detect_buffers = false;
options.detect_buffers = false;
}
this.should_buffer = false;
this.command_queue_high_water = options.command_queue_high_water || 1000;
this.command_queue_low_water = options.command_queue_low_water || 0;
this.max_attempts = +options.max_attempts || 0;
this.command_queue_high_water = +options.command_queue_high_water || 1000;
this.command_queue_low_water = options.command_queue_low_water | 0;
this.max_attempts = options.max_attempts | 0;
this.command_queue = new Queue(); // Holds sent commands to de-pipeline them
this.offline_queue = new Queue(); // Holds commands issued but not able to be sent
this.commands_sent = 0;
this.connect_timeout = +options.connect_timeout || 86400000; // 24 * 60 * 60 * 1000 ms
this.enable_offline_queue = options.enable_offline_queue === false ? false : true;
this.retry_max_delay = +options.retry_max_delay || null;
@@ -95,6 +94,7 @@ function RedisClient(stream, options) {
this.parser_module = null;
this.selected_db = null; // Save the selected db here, used when reconnecting
this.old_state = null;
this.options = options;
this.install_stream_listeners();
events.EventEmitter.call(this);
@@ -118,17 +118,16 @@ RedisClient.prototype.install_stream_listeners = function() {
self.on_error(err);
});
this.stream.on('close', function () {
this.stream.once('close', function () {
self.connection_gone('close');
});
this.stream.on('end', function () {
this.stream.once('end', function () {
self.connection_gone('end');
});
this.stream.on('drain', function () {
self.should_buffer = false;
self.emit('drain');
self.drain();
});
};
@@ -450,11 +449,6 @@ RedisClient.prototype.send_offline_queue = function () {
}
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
this.offline_queue = new Queue();
if (buffered_writes === 0) {
this.should_buffer = false;
this.emit('drain');
}
};
var retry_connection = function (self) {
@@ -558,6 +552,11 @@ RedisClient.prototype.return_error = function (err) {
}
};
RedisClient.prototype.drain = function () {
this.emit('drain');
this.should_buffer = false;
};
RedisClient.prototype.emit_drain_idle = function (queue_len) {
if (this.pub_sub_mode === false && queue_len === 0) {
// Free the queue capacity memory by using a new queue
@@ -566,8 +565,7 @@ RedisClient.prototype.emit_drain_idle = function (queue_len) {
}
if (this.should_buffer && queue_len <= this.command_queue_low_water) {
this.emit('drain');
this.should_buffer = false;
this.drain();
}
};
@@ -688,11 +686,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
command = command.toUpperCase();
err = new Error('send_command: ' + command + ' value must not be undefined or null');
err.command = command;
if (callback) {
callback(err);
} else {
this.emit('error', err);
}
this.callback_emit_error(callback, err);
// Singal no buffering
return true;
}
@@ -714,16 +708,15 @@ RedisClient.prototype.send_command = function (command, args, callback) {
if (this.closing || !this.enable_offline_queue) {
command = command.toUpperCase();
if (!this.closing) {
err = new Error(command + " can't be processed. Stream not writeable and enable_offline_queue is deactivated.");
var msg = !stream.writable ?
'Stream not writeable.' :
'The connection is not yet established and the offline queue is deactivated.';
err = new Error(command + " can't be processed. " + msg);
} else {
err = new Error(command + " can't be processed. The connection has already been closed.");
}
err.command = command;
if (callback) {
callback(err);
} else {
this.emit('error', err);
}
this.callback_emit_error(callback, err);
} else {
debug('Queueing ' + command + ' for next server connection.');
this.offline_queue.push(command_obj);
@@ -739,14 +732,8 @@ RedisClient.prototype.send_command = function (command, args, callback) {
this.monitoring = true;
} else if (command === 'quit') {
this.closing = true;
} else if (this.pub_sub_mode === true) {
err = new Error('Connection in subscriber mode, only subscriber commands may be used');
err.command = command.toUpperCase();
this.emit('error', err);
return true;
}
this.command_queue.push(command_obj);
this.commands_sent += 1;
if (typeof this.options.rename_commands !== 'undefined' && this.options.rename_commands[command]) {
command = this.options.rename_commands[command];
@@ -864,7 +851,7 @@ RedisClient.prototype.end = function (flush) {
clearTimeout(this.retry_timer);
this.retry_timer = null;
}
this.stream.on('error', function noop(){});
this.stream.on('error', noop);
// Flush queue if wanted
if (flush) {
@@ -882,9 +869,9 @@ function Multi(client, args) {
this.queue = new Queue();
var command, tmp_args;
if (Array.isArray(args)) {
while (tmp_args = args.shift()) {
command = tmp_args[0];
tmp_args = tmp_args.slice(1);
for (var i = 0; i < args.length; i++) {
command = args[i][0];
tmp_args = args[i].slice(1);
if (Array.isArray(command)) {
this[command[0]].apply(this, command.slice(1).concat(tmp_args));
} else {
@@ -972,18 +959,22 @@ RedisClient.prototype.select = RedisClient.prototype.SELECT = function (db, call
});
};
RedisClient.prototype.callback_emit_error = function (callback, err) {
if (callback) {
setImmediate(function () {
callback(err);
});
} else {
this.emit('error', err);
}
};
// Stash auth for connect and reconnect. Send immediately if already connected.
RedisClient.prototype.auth = RedisClient.prototype.AUTH = function (pass, callback) {
if (typeof pass !== 'string') {
var err = new Error('The password has to be of type "string"');
err.command = 'AUTH';
if (callback) {
setImmediate(function () {
callback(err);
});
} else {
this.emit('error', err);
}
this.callback_emit_error(callback, err);
return true;
}
this.auth_pass = pass;
@@ -1121,14 +1112,13 @@ Multi.prototype.execute_callback = function (err, replies) {
var i = 0, args;
if (err) {
if (err.code !== 'CONNECTION_BROKEN') {
err.errors = this.errors;
if (this.callback) {
this.callback(err);
} else {
// Exclude CONNECTION_BROKEN so that error won't be emitted twice
this._client.emit('error', err);
}
// The errors would be circular
err.errors = err.code !== 'CONNECTION_BROKEN' ? this.errors : [];
if (this.callback) {
this.callback(err);
} else if (err.code !== 'CONNECTION_BROKEN') {
// Exclude CONNECTION_BROKEN so that error won't be emitted twice
this._client.emit('error', err);
}
return;
}
@@ -1169,7 +1159,7 @@ Multi.prototype.execute_callback = function (err, replies) {
}
};
Multi.prototype.callback = function (cb, command, i) {
Multi.prototype.callback = function (cb, i) {
var self = this;
return function (err, res) {
if (err) {
@@ -1210,9 +1200,9 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
var command = args.shift();
var cb;
if (typeof args[args.length - 1] === 'function') {
cb = this.callback(args.pop(), command, index);
cb = this.callback(args.pop(), index);
} else {
cb = this.callback(undefined, command, index);
cb = this.callback(undefined, index);
}
if (callback && index === len - 1) {
cb = lastCallback(cb);
@@ -1241,7 +1231,7 @@ var createClient_tcp = function (port_arg, host_arg, options) {
var cnxOptions = {
port : port_arg || default_port,
host : host_arg || default_host,
family : options && options.family === 'IPv6' ? 6 : 4
family : options.family === 'IPv6' ? 6 : 4
};
var net_client = net.createConnection(cnxOptions);
var redis_client = new RedisClient(net_client, options);
@@ -1258,10 +1248,10 @@ var createClient = function (port_arg, host_arg, options) {
return createClient_tcp(+options.port, options.host, options);
}
if (typeof port_arg === 'number' || typeof port_arg === 'string' && /^\d+$/.test(port_arg)) {
return createClient_tcp(port_arg, host_arg, options);
return createClient_tcp(port_arg, host_arg, options || {});
}
if (typeof port_arg === 'string') {
options = host_arg || {};
options = host_arg || options || {};
var parsed = URL.parse(port_arg, true, true);
if (parsed.hostname) {