diff --git a/index.js b/index.js index 4263786382..b59e69d5e9 100644 --- a/index.js +++ b/index.js @@ -2,20 +2,22 @@ var net = require('net'); var tls = require('tls'); -var URL = require('url'); var util = require('util'); var utils = require('./lib/utils'); var Queue = require('double-ended-queue'); var Command = require('./lib/command'); -var events = require('events'); +var EventEmitter = require('events'); var Parser = require('redis-parser'); var commands = require('redis-commands'); -var connection_id = 0; -var default_port = 6379; -var default_host = '127.0.0.1'; +var debug = require('./lib/debug'); +var unifyOptions = require('./lib/createClient'); + +// Newer Node.js versions > 0.10 return the EventEmitter right away and using .EventEmitter was deprecated +if (typeof EventEmitter !== 'function') { + EventEmitter = EventEmitter.EventEmitter; +} function noop () {} -function debug (msg) { if (exports.debug_mode) { console.error(msg); } } function handle_detect_buffers_reply (reply, command, buffer_args) { if (buffer_args === false) { @@ -35,15 +37,15 @@ exports.debug_mode = /\bredis\b/i.test(process.env.NODE_DEBUG); function RedisClient (options) { // Copy the options so they are not mutated options = utils.clone(options); - events.EventEmitter.call(this); + EventEmitter.call(this); var cnx_options = {}; var self = this; if (options.path) { cnx_options.path = options.path; this.address = options.path; } else { - cnx_options.port = +options.port || default_port; - cnx_options.host = options.host || default_host; + cnx_options.port = +options.port || 6379; + cnx_options.host = options.host || '127.0.0.1'; cnx_options.family = (!options.family && net.isIP(cnx_options.host)) || (options.family === 'IPv6' ? 6 : 4); this.address = cnx_options.host + ':' + cnx_options.port; } @@ -459,9 +461,12 @@ RedisClient.prototype.on_info_cmd = function (err, res) { RedisClient.prototype.ready_check = function () { var self = this; debug('Checking server ready state...'); + // Always fire this info command as first command even if other commands are already queued up + this.ready = true; this.info(function (err, res) { self.on_info_cmd(err, res); }); + this.ready = false; }; RedisClient.prototype.send_offline_queue = function () { @@ -599,11 +604,7 @@ RedisClient.prototype.return_error = function (err) { this.emit_idle(queue_len); - if (command_obj && command_obj.callback) { - command_obj.callback(err); - } else { - this.emit('error', err); - } + utils.callback_or_emit(this, command_obj && command_obj.callback, err); }; RedisClient.prototype.drain = function () { @@ -612,13 +613,78 @@ RedisClient.prototype.drain = function () { }; RedisClient.prototype.emit_idle = function (queue_len) { - if (this.pub_sub_mode === false && queue_len === 0) { + if (queue_len === 0 && this.pub_sub_mode === false) { this.emit('idle'); } }; +/* istanbul ignore next: this is a safety check that we should not be able to trigger */ +function queue_state_error (self, command_obj) { + var err = new Error('node_redis command queue state error. If you can reproduce this, please report it.'); + err.command_obj = command_obj; + self.emit('error', err); +} + +function monitor (self, reply) { + if (typeof reply !== 'string') { + reply = reply.toString(); + } + // If in monitoring mode only two commands are valid ones: AUTH and MONITOR wich reply with OK + var len = reply.indexOf(' '); + var timestamp = reply.slice(0, len); + var argindex = reply.indexOf('"'); + var args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) { + return elem.replace(/\\"/g, '"'); + }); + self.emit('monitor', timestamp, args); +} + +function normal_reply (self, reply, command_obj) { + if (typeof command_obj.callback === 'function') { + if ('exec' !== command_obj.command) { + reply = self.handle_reply(reply, command_obj.command, command_obj.buffer_args); + } + command_obj.callback(null, reply); + } else { + debug('No callback for reply'); + } +} + +function return_pub_sub (self, reply, command_obj) { + if (reply instanceof Array) { + if ((!command_obj || command_obj.buffer_args === false) && !self.options.return_buffers) { + reply = utils.reply_to_strings(reply); + } + var type = reply[0].toString(); + + // TODO: Add buffer emiters (we have to get all pubsub messages as buffers back in that case) + if (type === 'message') { + self.emit('message', reply[1], reply[2]); // channcel, message + } else if (type === 'pmessage') { + self.emit('pmessage', reply[1], reply[2], reply[3]); // pattern, channcel, message + } else if (type === 'subscribe' || type === 'unsubscribe' || type === 'psubscribe' || type === 'punsubscribe') { + if (reply[2].toString() === '0') { + self.pub_sub_mode = false; + debug('All subscriptions removed, exiting pub/sub mode'); + } else { + self.pub_sub_mode = true; + } + // Subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback + // TODO - document this or fix it so it works in a more obvious way + if (command_obj && typeof command_obj.callback === 'function') { + command_obj.callback(null, reply[1]); + } + self.emit(type, reply[1], reply[2]); // channcel, count + } else { + self.emit('error', new Error('subscriptions are active but got unknown reply type ' + type)); + } + } else if (!self.closing) { + self.emit('error', new Error('subscriptions are active but got an invalid reply: ' + reply)); + } +} + RedisClient.prototype.return_reply = function (reply) { - var command_obj, len, type, timestamp, argindex, args, queue_len; + var command_obj, type, queue_len; // If the 'reply' here is actually a message received asynchronously due to a // pubsub subscription, don't pop the command queue as we'll only be consuming @@ -638,84 +704,68 @@ RedisClient.prototype.return_reply = function (reply) { this.emit_idle(queue_len); if (command_obj && !command_obj.sub_command) { - if (typeof command_obj.callback === 'function') { - if ('exec' !== command_obj.command) { - reply = this.handle_reply(reply, command_obj.command, command_obj.buffer_args); - } - command_obj.callback(null, reply); - } else { - debug('No callback for reply'); - } + normal_reply(this, reply, command_obj); } else if (this.pub_sub_mode || command_obj && command_obj.sub_command) { - if (reply instanceof Array) { - if ((!command_obj || command_obj.buffer_args === false) && !this.options.return_buffers) { - reply = utils.reply_to_strings(reply); - } - type = reply[0].toString(); - - // TODO: Add buffer emiters (we have to get all pubsub messages as buffers back in that case) - if (type === 'message') { - this.emit('message', reply[1], reply[2]); // channel, message - } else if (type === 'pmessage') { - this.emit('pmessage', reply[1].toString(), reply[2], reply[3]); // pattern, channel, message - } else if (type === 'subscribe' || type === 'unsubscribe' || type === 'psubscribe' || type === 'punsubscribe') { - if (reply[2].toString() === '0') { - this.pub_sub_mode = false; - debug('All subscriptions removed, exiting pub/sub mode'); - } else { - this.pub_sub_mode = true; - } - // Subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback - // TODO - document this or fix it so it works in a more obvious way - if (command_obj && typeof command_obj.callback === 'function') { - command_obj.callback(null, reply[1]); - } - this.emit(type, reply[1], reply[2]); // channel, count - } else { - this.emit('error', new Error('subscriptions are active but got unknown reply type ' + type)); - } - } else if (!this.closing) { - this.emit('error', new Error('subscriptions are active but got an invalid reply: ' + reply)); - } + return_pub_sub(this, reply, command_obj); } /* istanbul ignore else: this is a safety check that we should not be able to trigger */ else if (this.monitoring) { - if (typeof reply !== 'string') { - reply = reply.toString(); - } - // If in monitoring mode only two commands are valid ones: AUTH and MONITOR wich reply with OK - len = reply.indexOf(' '); - timestamp = reply.slice(0, len); - argindex = reply.indexOf('"'); - args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) { - return elem.replace(/\\"/g, '"'); - }); - this.emit('monitor', timestamp, args); + monitor(this, reply); } else { - var err = new Error('node_redis command queue state error. If you can reproduce this, please report it.'); - err.command_obj = command_obj; - this.emit('error', err); + queue_state_error(this, command_obj); } }; -RedisClient.prototype.send_command = function (command, args, callback) { - var arg, command_obj, i, err, - stream = this.stream, - command_str = '', - buffer_args = false, - big_data = false, - prefix_keys, - len, args_copy; - - if (typeof args === 'undefined') { - args = []; +function handle_offline_command (self, command_obj) { + var command = command_obj.command; + var callback = command_obj.callback; + var err, msg; + if (self.closing || !self.enable_offline_queue) { + command = command.toUpperCase(); + if (!self.closing) { + if (self.stream.writable) { + msg = 'The connection is not yet established and the offline queue is deactivated.'; + } else { + msg = 'Stream not writeable.'; + } + } else { + msg = 'The connection has already been closed.'; + } + err = new Error(command + " can't be processed. " + msg); + err.command = command; + utils.reply_in_order(self, callback, err); + } else { + debug('Queueing ' + command + ' for next server connection.'); + self.offline_queue.push(command_obj); } - if (callback && process.domain) { + self.should_buffer = true; +} + +RedisClient.prototype.send_command = function (command, args, callback) { + var args_copy, arg, prefix_keys; + var i = 0; + var command_str = ''; + var len = 0; + var big_data = false; + + if (process.domain && callback) { callback = process.domain.bind(callback); } - len = args.length; - args_copy = new Array(len); + var command_obj = new Command(command, args, callback); + + if (this.ready === false || this.stream.writable === false) { + // Handle offline commands right away + handle_offline_command(this, command_obj); + return false; // Indicate buffering + } + + if (typeof args === 'undefined') { + args_copy = []; + } else { + len = args.length; + args_copy = new Array(len); + } for (i = 0; i < len; i += 1) { if (typeof args[i] === 'string') { @@ -742,7 +792,8 @@ RedisClient.prototype.send_command = function (command, args, callback) { args_copy[i] = 'null'; // Backwards compatible :/ } else { args_copy[i] = args[i]; - buffer_args = true; + command_obj.buffer_args = true; + big_data = true; if (this.pipeline !== 0) { this.pipeline += 2; this.writeDefault = this.writeBuffers; @@ -756,34 +807,11 @@ RedisClient.prototype.send_command = function (command, args, callback) { ); args_copy[i] = 'undefined'; // Backwards compatible :/ } else { - args_copy[i] = String(args[i]); + // Seems like numbers are converted fast using string concatenation + args_copy[i] = '' + args[i]; } } - - command_obj = new Command(command, args_copy, buffer_args, callback); - - // TODO: Replace send_anyway with `commands.hasFlag(command, 'loading') === false` as soon as pub_sub is handled in the result handler - if (this.ready === false && this.send_anyway === false || !stream.writable) { - if (this.closing || !this.enable_offline_queue) { - command = command.toUpperCase(); - if (!this.closing) { - var msg = !stream.writable ? - 'Stream not writeable.' : - 'The connection is not yet established and the offline queue is deactivated.'; - err = new Error(command + " can't be processed. " + msg); - } else { - err = new Error(command + " can't be processed. The connection has already been closed."); - } - err.command = command; - this.callback_emit_error(callback, err); - } else { - debug('Queueing ' + command + ' for next server connection.'); - this.offline_queue.push(command_obj); - this.should_buffer = true; - } - // Return false to signal buffering - return !this.should_buffer; - } + args = null; if (command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe') { this.pub_sub_command(command_obj); // TODO: This has to be moved to the result handler @@ -807,7 +835,7 @@ RedisClient.prototype.send_command = function (command, args, callback) { // This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer. command_str = '*' + (len + 1) + '\r\n$' + command.length + '\r\n' + command + '\r\n'; - if (buffer_args === false && big_data === false) { // Build up a string and send entire command in one write + if (big_data === false) { // Build up a string and send entire command in one write for (i = 0; i < len; i += 1) { arg = args_copy[i]; command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n'; @@ -820,7 +848,7 @@ RedisClient.prototype.send_command = function (command, args, callback) { for (i = 0; i < len; i += 1) { arg = args_copy[i]; - if (typeof arg !== 'object') { // string; number; boolean + if (typeof arg === 'string') { this.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n'); } else { // buffer this.write('$' + arg.length + '\r\n'); @@ -911,497 +939,26 @@ RedisClient.prototype.end = function (flush) { 'Please check the doku (https://github.com/NodeRedis/node_redis) and explictly use flush.' ); } - - this.stream._events = {}; - // Clear retry_timer if (this.retry_timer){ clearTimeout(this.retry_timer); this.retry_timer = null; } + this.stream.removeAllListeners(); this.stream.on('error', noop); - this.connected = false; this.ready = false; this.closing = true; return this.stream.destroySoon(); }; -function Multi(client, args) { - this._client = client; - this.queue = new Queue(); - var command, tmp_args; - if (args) { // Either undefined or an array. Fail hard if it's not an array - for (var i = 0; i < args.length; i++) { - command = args[i][0]; - tmp_args = args[i].slice(1); - if (Array.isArray(command)) { - this[command[0]].apply(this, command.slice(1).concat(tmp_args)); - } else { - this[command].apply(this, tmp_args); - } - } - } -} - -commands.list.forEach(function (command) { - - RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command] = function () { - var arr, - len = 0, - callback, - i = 0; - if (arguments[0] instanceof Array) { - arr = arguments[0]; - callback = arguments[1]; // It does not matter if it exists or not - } else if (arguments[1] instanceof Array) { - len = arguments[1].length; - arr = new Array(len + 1); - arr[0] = arguments[0]; - for (; i < len; i += 1) { - arr[i + 1] = arguments[1][i]; - } - callback = arguments[2]; - } else { - len = arguments.length; - arr = new Array(len); - for (; i < len; i += 1) { - arr[i] = arguments[i]; - } - // The later should not be the average use case - if (typeof arr[i - 1] === 'function' || typeof arr[i - 1] === 'undefined') { - callback = arr.pop(); - } - } - return this.send_command(command, arr, callback); - }; - - Multi.prototype[command.toUpperCase()] = Multi.prototype[command] = function () { - var arr, - len = 0, - callback, - i = 0; - if (arguments[0] instanceof Array) { - arr = arguments[0]; - callback = arguments[1]; - } else if (arguments[1] instanceof Array) { - len = arguments[1].length; - arr = new Array(len + 1); - arr[0] = arguments[0]; - for (; i < len; i += 1) { - arr[i + 1] = arguments[1][i]; - } - callback = arguments[2]; - } else { - len = arguments.length; - arr = new Array(len); - for (; i < len; i += 1) { - arr[i] = arguments[i]; - } - // The later should not be the average use case - if (typeof arr[i - 1] === 'function' || typeof arr[i - 1] === 'undefined') { - callback = arr.pop(); - } - } - this.queue.push([command, arr, callback]); - return this; - }; -}); - -RedisClient.prototype.multi = RedisClient.prototype.MULTI = function (args) { - var multi = new Multi(this, args); - multi.exec = multi.EXEC = multi.exec_transaction; - return multi; +exports.createClient = function () { + return new RedisClient(unifyOptions.apply(null, arguments)); }; - -RedisClient.prototype.batch = RedisClient.prototype.BATCH = function (args) { - return new Multi(this, args); -}; - -// Store db in this.select_db to restore it on reconnect -RedisClient.prototype.select = RedisClient.prototype.SELECT = function (db, callback) { - var self = this; - return this.send_command('select', [db], function (err, res) { - if (err === null) { - self.selected_db = db; - } - if (typeof callback === 'function') { - callback(err, res); - } else if (err) { - self.emit('error', err); - } - }); -}; - -// Store info in this.server_info after each call -RedisClient.prototype.info = RedisClient.prototype.INFO = function (callback) { - var self = this; - this.send_anyway = true; - var tmp = this.send_command('info', [], function (err, res) { - if (res) { - var obj = {}; - var lines = res.toString().split('\r\n'); - var line, parts, sub_parts; - - for (var i = 0; i < lines.length; i++) { - parts = lines[i].split(':'); - if (parts[1]) { - if (parts[0].indexOf('db') === 0) { - sub_parts = parts[1].split(','); - obj[parts[0]] = {}; - while (line = sub_parts.pop()) { - line = line.split('='); - obj[parts[0]][line[0]] = +line[1]; - } - } else { - obj[parts[0]] = parts[1]; - } - } - } - obj.versions = []; - /* istanbul ignore else: some redis servers do not send the version */ - if (obj.redis_version) { - obj.redis_version.split('.').forEach(function (num) { - obj.versions.push(+num); - }); - } - // Expose info key/vals to users - self.server_info = obj; - } else { - self.server_info = {}; - } - if (typeof callback === 'function') { - callback(err, res); - } else if (err) { - self.emit('error', err); - } - }); - this.send_anyway = false; - return tmp; -}; - -RedisClient.prototype.callback_emit_error = function (callback, err) { - if (callback) { - setImmediate(function () { - callback(err); - }); - } else { - this.emit('error', err); - } -}; - -var noPasswordIsSet = /no password is set/; - -RedisClient.prototype.auth = function (pass, callback) { - var self = this; - debug('Sending auth to ' + self.address + ' id ' + self.connection_id); - - // Stash auth for connect and reconnect. - this.auth_pass = pass; - this.send_anyway = true; - var tmp = this.send_command('auth', [pass], function (err, res) { - if (err) { - if (noPasswordIsSet.test(err.message)) { - self.warn('Warning: Redis server does not require a password, but a password was supplied.'); - err = null; - res = 'OK'; - } else if (!callback) { - self.emit('error', err); - } - } - if (callback) { - callback(err, res); - } - }); - this.send_anyway = false; - return tmp; -}; - -RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function () { - var arr, - len = 0, - callback, - i = 0; - if (arguments[0] instanceof Array) { - arr = arguments[0]; - callback = arguments[1]; - } else if (arguments[1] instanceof Array) { - len = arguments[1].length; - arr = new Array(len + 1); - arr[0] = arguments[0]; - for (; i < len; i += 1) { - arr[i + 1] = arguments[1][i]; - } - callback = arguments[2]; - } else if (typeof arguments[1] === 'object' && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) { - arr = [arguments[0]]; - for (var field in arguments[1]) { // jshint ignore: line - arr.push(field, arguments[1][field]); - } - callback = arguments[2]; - } else { - len = arguments.length; - arr = new Array(len); - for (; i < len; i += 1) { - arr[i] = arguments[i]; - } - // The later should not be the average use case - if (typeof arr[i - 1] === 'function' || typeof arr[i - 1] === 'undefined') { - callback = arr.pop(); - } - } - return this.send_command('hmset', arr, callback); -}; - -Multi.prototype.hmset = Multi.prototype.HMSET = function () { - var arr, - len = 0, - callback, - i = 0; - if (arguments[0] instanceof Array) { - arr = arguments[0]; - callback = arguments[1]; - } else if (arguments[1] instanceof Array) { - len = arguments[1].length; - arr = new Array(len + 1); - arr[0] = arguments[0]; - for (; i < len; i += 1) { - arr[i + 1] = arguments[1][i]; - } - callback = arguments[2]; - } else if (typeof arguments[1] === 'object' && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) { - arr = [arguments[0]]; - for (var field in arguments[1]) { // jshint ignore: line - arr.push(field, arguments[1][field]); - } - callback = arguments[2]; - } else { - len = arguments.length; - arr = new Array(len); - for (; i < len; i += 1) { - arr[i] = arguments[i]; - } - // The later should not be the average use case - if (typeof arr[i - 1] === 'function' || typeof arr[i - 1] === 'undefined') { - callback = arr.pop(); - } - } - this.queue.push(['hmset', arr, callback]); - return this; -}; - -Multi.prototype.send_command = function (command, args, index, cb) { - var self = this; - this._client.send_command(command, args, function (err, reply) { - if (err) { - if (cb) { - cb(err); - } - err.position = index; - self.errors.push(err); - } - }); -}; - -Multi.prototype.exec_atomic = function (callback) { - if (this.queue.length < 2) { - return this.exec_batch(callback); - } - return this.exec(callback); -}; - -Multi.prototype.exec_transaction = function (callback) { - var self = this; - var len = this.queue.length; - this.errors = []; - this.callback = callback; - this._client.cork(len + 2); - this.wants_buffers = new Array(len); - this.send_command('multi', []); - // Drain queue, callback will catch 'QUEUED' or error - for (var index = 0; index < len; index++) { - var args = this.queue.get(index); - var command = args[0]; - var cb = args[2]; - // Keep track of who wants buffer responses: - if (this._client.options.detect_buffers) { - this.wants_buffers[index] = false; - for (var i = 0; i < args[1].length; i += 1) { - if (Buffer.isBuffer(args[1][i])) { - this.wants_buffers[index] = true; - break; - } - } - } - this.send_command(command, args[1], index, cb); - } - - this._client.send_command('exec', [], function(err, replies) { - self.execute_callback(err, replies); - }); - this._client.uncork(); - this._client.writeDefault = this._client.writeStrings; - return !this._client.should_buffer; -}; - -Multi.prototype.execute_callback = function (err, replies) { - var i = 0, args; - - if (err) { - // The errors would be circular - var connection_error = ['CONNECTION_BROKEN', 'UNCERTAIN_STATE'].indexOf(err.code) !== -1; - err.errors = connection_error ? [] : this.errors; - if (this.callback) { - this.callback(err); - // Exclude connection errors so that those errors won't be emitted twice - } else if (!connection_error) { - this._client.emit('error', err); - } - return; - } - - if (replies) { - while (args = this.queue.shift()) { - if (replies[i] instanceof Error) { - var match = replies[i].message.match(utils.err_code); - // LUA script could return user errors that don't behave like all other errors! - if (match) { - replies[i].code = match[1]; - } - replies[i].command = args[0].toUpperCase(); - if (typeof args[args.length - 1] === 'function') { - args[args.length - 1](replies[i]); - } - } else { - // If we asked for strings, even in detect_buffers mode, then return strings: - replies[i] = this._client.handle_reply(replies[i], args[0], this.wants_buffers[i]); - if (typeof args[args.length - 1] === 'function') { - args[args.length - 1](null, replies[i]); - } - } - i++; - } - } - - if (this.callback) { - this.callback(null, replies); - } -}; - -Multi.prototype.callback = function (cb, i) { - var self = this; - return function (err, res) { - if (err) { - self.results[i] = err; - // Add the position to the error - self.results[i].position = i; - } else { - self.results[i] = res; - } - cb(err, res); - }; -}; - -Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function (callback) { - var len = this.queue.length; - var self = this; - var index = 0; - var args; - var args_len = 1; - var callback_without_own_cb = function (err, res) { - if (err) { - self.results.push(err); - // Add the position to the error - var i = self.results.length - 1; - self.results[i].position = i; - } else { - self.results.push(res); - } - // Do not emit an error here. Otherwise each error would result in one emit. - // The errors will be returned in the result anyway - }; - var last_callback = function (cb) { - return function (err, res) { - cb(err, res); - callback(null, self.results); - }; - }; - if (len === 0) { - if (callback) { - // The execution order won't be obtained in this case - setImmediate(function () { - callback(null, []); - }); - } - return true; - } - this.results = []; - this._client.cork(len); - while (args = this.queue.shift()) { - var command = args[0]; - var cb; - args_len = args[1].length - 1; - if (typeof args[2] === 'function') { - cb = this.callback(args[2], index); - } else { - cb = callback_without_own_cb; - } - if (callback && index === len - 1) { - cb = last_callback(cb); - } - this._client.send_command(command, args[1], cb); - index++; - } - this._client.uncork(); - this._client.writeDefault = this._client.writeStrings; - return !this._client.should_buffer; -}; - -var createClient = function (port_arg, host_arg, options) { - if (typeof port_arg === 'number' || typeof port_arg === 'string' && /^\d+$/.test(port_arg)) { - options = utils.clone(options); - options.host = host_arg; - options.port = port_arg; - } else if (typeof port_arg === 'string' || port_arg && port_arg.url) { - options = utils.clone(port_arg.url ? port_arg : host_arg || options); - var parsed = URL.parse(port_arg.url || port_arg, true, true); - // [redis:]//[[user][:password]@][host][:port][/db-number][?db=db-number[&password=bar[&option=value]]] - if (parsed.hostname || parsed.slashes) { // The host might be an empty string - if (parsed.auth) { - options.password = parsed.auth.split(':')[1]; - } - if (!/^([a-z]+:)?\/\//i.test(parsed.href)) { - throw new Error('Connection string must use the "redis:" protocol or begin with slashes //'); - } - if (parsed.pathname && parsed.pathname !== '/') { - options.db = parsed.pathname.substr(1); - } - options.host = parsed.hostname; - options.port = parsed.port; - if (parsed.search !== '') { - var elem; - for (elem in parsed.query) { // jshint ignore: line - // If options are passed twice, only the parsed options will be used - if (options.hasOwnPropery(elem)) { - RedisClient.warn('WARNING: You passed the ' + elem + ' option twice!'); - } - options[elem] = parsed.query[elem]; - } - } - } else { - options.path = port_arg; - } - } else if (typeof port_arg === 'object' || port_arg === undefined) { - options = utils.clone(port_arg || options); - options.host = options.host || host_arg; - } - if (!options) { - throw new Error('Unknown type of connection in createClient()'); - } - return new RedisClient(options); -}; - -exports.createClient = createClient; exports.RedisClient = RedisClient; exports.print = utils.print; -exports.Multi = Multi; +exports.Multi = require('./lib/multi'); + +// Add all redis commands to the client +require('./lib/individualCommands'); +require('./lib/commands'); diff --git a/package.json b/package.json index ac010b7f32..7760c7cf87 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "dependencies": { "double-ended-queue": "^2.1.0-0", "redis-commands": "^1.0.1", - "redis-parser": "^1.0.0" + "redis-parser": "^1.1.0" }, "engines": { "node": ">=0.10.0" @@ -33,6 +33,7 @@ "devDependencies": { "bluebird": "^3.0.2", "coveralls": "^2.11.2", + "intercept-stdout": "~0.1.2", "jshint": "^2.8.0", "metrics": "^0.1.9", "mocha": "^2.3.2", diff --git a/test/multi.spec.js b/test/multi.spec.js index 181cdf87d5..334d0ae084 100644 --- a/test/multi.spec.js +++ b/test/multi.spec.js @@ -120,7 +120,7 @@ describe("The 'multi' method", function () { assert(err.message.match(/The connection has already been closed/)); done(); }); - assert.strictEqual(notBuffering, true); + assert.strictEqual(notBuffering, false); }); it("reports an error if promisified", function () { diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index 527633e6ba..45e86a5773 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -747,7 +747,7 @@ describe("The node_redis client", function () { enable_offline_queue: false }); client.on('ready', function () { - client.stream.writable = false; + client.stream.destroy(); client.set('foo', 'bar', function (err, res) { assert.strictEqual(err.message, "SET can't be processed. Stream not writeable."); done();