diff --git a/lib/command.js b/lib/command.js index d04052ff6d..0c01976bb7 100644 --- a/lib/command.js +++ b/lib/command.js @@ -2,10 +2,10 @@ // This Command constructor is ever so slightly faster than using an object literal, but more importantly, using // a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots. -function Command(command, args, buffer_args, callback) { +function Command(command, args, callback) { this.command = command; this.args = args; - this.buffer_args = buffer_args; + this.buffer_args = false; this.callback = callback; } diff --git a/lib/commands.js b/lib/commands.js new file mode 100644 index 0000000000..734e7b7060 --- /dev/null +++ b/lib/commands.js @@ -0,0 +1,86 @@ +'use strict'; + +var commands = require('redis-commands'); +var Multi = require('./multi'); +var RedisClient = require('../').RedisClient; + +// TODO: Rewrite this including the invidual commands into a Commands class +// that provided a functionality to add new commands to the client + +commands.list.forEach(function (command) { + + // Do not override existing functions + if (!RedisClient.prototype[command]) { + RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command] = function () { + var arr; + var len = arguments.length; + var callback; + var i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + if (len === 2) { + callback = arguments[1]; + } + } else if (len > 1 && Array.isArray(arguments[1])) { + if (len === 3) { + callback = arguments[2]; + } + len = arguments[1].length; + arr = new Array(len + 1); + arr[0] = arguments[0]; + for (; i < len; i += 1) { + arr[i + 1] = arguments[1][i]; + } + } else { + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + return this.send_command(command, arr, callback); + }; + } + + // Do not override existing functions + if (!Multi.prototype[command]) { + Multi.prototype[command.toUpperCase()] = Multi.prototype[command] = function () { + var arr; + var len = arguments.length; + var callback; + var i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + if (len === 2) { + callback = arguments[1]; + } + } else if (len > 1 && Array.isArray(arguments[1])) { + if (len === 3) { + callback = arguments[2]; + } + len = arguments[1].length; + arr = new Array(len + 1); + arr[0] = arguments[0]; + for (; i < len; i += 1) { + arr[i + 1] = arguments[1][i]; + } + } else { + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + this.queue.push([command, arr, callback]); + return this; + }; + } +}); diff --git a/lib/createClient.js b/lib/createClient.js new file mode 100644 index 0000000000..3b14ef06f3 --- /dev/null +++ b/lib/createClient.js @@ -0,0 +1,67 @@ +'use strict'; + +var utils = require('./utils'); +var URL = require('url'); + +module.exports = function createClient (port_arg, host_arg, options) { + + if (typeof port_arg === 'number' || typeof port_arg === 'string' && /^\d+$/.test(port_arg)) { + + var host; + if (typeof host_arg === 'string') { + host = host_arg; + } else { + options = options || host_arg; + } + options = utils.clone(options); + options.host = host || options.host; + options.port = port_arg; + + } else if (typeof port_arg === 'string' || port_arg && port_arg.url) { + + options = utils.clone(port_arg.url ? port_arg : host_arg || options); + var parsed = URL.parse(port_arg.url || port_arg, true, true); + + // [redis:]//[[user][:password]@][host][:port][/db-number][?db=db-number[&password=bar[&option=value]]] + if (parsed.hostname || parsed.slashes) { // The host might be an empty string + if (parsed.auth) { + options.password = parsed.auth.split(':')[1]; + } + if (!/^([a-z]+:)?\/\//i.test(parsed.href)) { + throw new Error('Connection string must use the "redis:" protocol or begin with slashes //'); + } + if (parsed.pathname && parsed.pathname !== '/') { + options.db = parsed.pathname.substr(1); + } + options.host = parsed.hostname; + options.port = parsed.port; + + if (parsed.search !== '') { + var elem; + for (elem in parsed.query) { // jshint ignore: line + // If options are passed twice, only the parsed options will be used + if (elem in options) { + if (options[elem] === parsed.query[elem]) { + console.warn('node_redis: WARNING: You passed the ' + elem + ' option twice!'); + } else { + throw new Error('The ' + elem + ' option is added twice and does not match'); + } + } + options[elem] = parsed.query[elem]; + } + } + } else { + options.path = port_arg; + } + + } else if (typeof port_arg === 'object' || port_arg === undefined) { + options = utils.clone(port_arg || options); + options.host = options.host || host_arg; + } + + if (!options) { + throw new Error('Unknown type of connection in createClient()'); + } + + return options; +}; diff --git a/lib/debug.js b/lib/debug.js new file mode 100644 index 0000000000..3f9d482bbc --- /dev/null +++ b/lib/debug.js @@ -0,0 +1,11 @@ +'use strict'; + +var index = require('../'); + +function debug (msg) { + if (index.debug_mode) { + console.error(msg); + } +} + +module.exports = debug; diff --git a/lib/individualCommands.js b/lib/individualCommands.js new file mode 100644 index 0000000000..b162238cbe --- /dev/null +++ b/lib/individualCommands.js @@ -0,0 +1,137 @@ +'use strict'; + +var utils = require('./utils'); +var debug = require('./debug'); +var Multi = require('./multi'); +var no_password_is_set = /no password is set/; +var RedisClient = require('../').RedisClient; + +/******************************** +Replace built-in redis functions +********************************/ + +RedisClient.prototype.multi = RedisClient.prototype.MULTI = function multi (args) { + var multi = new Multi(this, args); + multi.exec = multi.EXEC = multi.exec_transaction; + return multi; +}; + +// ATTENTION: This is not a native function but is still handled as a individual command as it behaves just the same as multi +RedisClient.prototype.batch = RedisClient.prototype.BATCH = function batch (args) { + return new Multi(this, args); +}; + +// Store db in this.select_db to restore it on reconnect +RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (db, callback) { + var self = this; + return this.send_command('select', [db], function (err, res) { + if (err === null) { + self.selected_db = db; + } + utils.callback_or_emit(self, callback, err, res); + }); +}; + +// Store info in this.server_info after each call +RedisClient.prototype.info = RedisClient.prototype.INFO = function info (callback) { + var self = this; + var ready = this.ready; + this.ready = ready || this.offline_queue.length === 0; // keep the execution order intakt + var tmp = this.send_command('info', [], function (err, res) { + if (res) { + var obj = {}; + var lines = res.toString().split('\r\n'); + var line, parts, sub_parts; + + for (var i = 0; i < lines.length; i++) { + parts = lines[i].split(':'); + if (parts[1]) { + if (parts[0].indexOf('db') === 0) { + sub_parts = parts[1].split(','); + obj[parts[0]] = {}; + while (line = sub_parts.pop()) { + line = line.split('='); + obj[parts[0]][line[0]] = +line[1]; + } + } else { + obj[parts[0]] = parts[1]; + } + } + } + obj.versions = []; + /* istanbul ignore else: some redis servers do not send the version */ + if (obj.redis_version) { + obj.redis_version.split('.').forEach(function (num) { + obj.versions.push(+num); + }); + } + // Expose info key/vals to users + self.server_info = obj; + } else { + self.server_info = {}; + } + utils.callback_or_emit(self, callback, err, res); + }); + this.ready = ready; + return tmp; +}; + +RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, callback) { + var self = this; + var ready = this.ready; + debug('Sending auth to ' + self.address + ' id ' + self.connection_id); + + // Stash auth for connect and reconnect. + this.auth_pass = pass; + this.ready = this.offline_queue.length === 0; // keep the execution order intakt + var tmp = this.send_command('auth', [pass], function (err, res) { + if (err && no_password_is_set.test(err.message)) { + self.warn('Warning: Redis server does not require a password, but a password was supplied.'); + err = null; + res = 'OK'; + } + + utils.callback_or_emit(self, callback, err, res); + }); + this.ready = ready; + return tmp; +}; + +RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () { + var arr, + len = arguments.length, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else if (Array.isArray(arguments[1])) { + if (len === 3) { + callback = arguments[2]; + } + len = arguments[1].length; + arr = new Array(len + 1); + arr[0] = arguments[0]; + for (; i < len; i += 1) { + arr[i + 1] = arguments[1][i]; + } + } else if (typeof arguments[1] === 'object' && (arguments.length === 2 || arguments.length === 3 && typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) { + arr = [arguments[0]]; + for (var field in arguments[1]) { // jshint ignore: line + arr.push(field, arguments[1][field]); + } + callback = arguments[2]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + return this.send_command('hmset', arr, callback); +}; diff --git a/lib/multi.js b/lib/multi.js new file mode 100644 index 0000000000..5a06e1096a --- /dev/null +++ b/lib/multi.js @@ -0,0 +1,224 @@ +'use strict'; + +var Queue = require('double-ended-queue'); +var utils = require('./utils'); + +function Multi(client, args) { + this._client = client; + this.queue = new Queue(); + var command, tmp_args; + if (args) { // Either undefined or an array. Fail hard if it's not an array + for (var i = 0; i < args.length; i++) { + command = args[i][0]; + tmp_args = args[i].slice(1); + if (Array.isArray(command)) { + this[command[0]].apply(this, command.slice(1).concat(tmp_args)); + } else { + this[command].apply(this, tmp_args); + } + } + } +} + +Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () { + var arr, + len = 0, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else if (Array.isArray(arguments[1])) { + len = arguments[1].length; + arr = new Array(len + 1); + arr[0] = arguments[0]; + for (; i < len; i += 1) { + arr[i + 1] = arguments[1][i]; + } + callback = arguments[2]; + } else if (typeof arguments[1] === 'object' && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) { + arr = [arguments[0]]; + for (var field in arguments[1]) { // jshint ignore: line + arr.push(field, arguments[1][field]); + } + callback = arguments[2]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + this.queue.push(['hmset', arr, callback]); + return this; +}; + +function pipeline_transaction_command (self, command, args, index, cb) { + self._client.send_command(command, args, function (err, reply) { + if (err) { + if (cb) { + cb(err); + } + err.position = index; + self.errors.push(err); + } + }); +} + +Multi.prototype.exec_atomic = function exec_atomic (callback) { + if (this.queue.length < 2) { + return this.exec_batch(callback); + } + return this.exec(callback); +}; + +function multi_callback (self, err, replies) { + var i = 0, args; + + if (err) { + // The errors would be circular + var connection_error = ['CONNECTION_BROKEN', 'UNCERTAIN_STATE'].indexOf(err.code) !== -1; + err.errors = connection_error ? [] : self.errors; + if (self.callback) { + self.callback(err); + // Exclude connection errors so that those errors won't be emitted twice + } else if (!connection_error) { + self._client.emit('error', err); + } + return; + } + + if (replies) { + while (args = self.queue.shift()) { + if (replies[i] instanceof Error) { + var match = replies[i].message.match(utils.err_code); + // LUA script could return user errors that don't behave like all other errors! + if (match) { + replies[i].code = match[1]; + } + replies[i].command = args[0].toUpperCase(); + if (typeof args[2] === 'function') { + args[2](replies[i]); + } + } else { + // If we asked for strings, even in detect_buffers mode, then return strings: + replies[i] = self._client.handle_reply(replies[i], args[0], self.wants_buffers[i]); + if (typeof args[2] === 'function') { + args[2](null, replies[i]); + } + } + i++; + } + } + + if (self.callback) { + self.callback(null, replies); + } +} + +Multi.prototype.exec_transaction = function exec_transaction (callback) { + var self = this; + var len = self.queue.length; + self.errors = []; + self.callback = callback; + self._client.cork(len + 2); + self.wants_buffers = new Array(len); + pipeline_transaction_command(self, 'multi', []); + // Drain queue, callback will catch 'QUEUED' or error + for (var index = 0; index < len; index++) { + var args = self.queue.get(index); + var command = args[0]; + var cb = args[2]; + // Keep track of who wants buffer responses: + if (self._client.options.detect_buffers) { + self.wants_buffers[index] = false; + for (var i = 0; i < args[1].length; i += 1) { + if (args[1][i] instanceof Buffer) { + self.wants_buffers[index] = true; + break; + } + } + } + pipeline_transaction_command(self, command, args[1], index, cb); + } + + self._client.send_command('exec', [], function(err, replies) { + multi_callback(self, err, replies); + }); + self._client.uncork(); + self._client.writeDefault = self._client.writeStrings; + return !self._client.should_buffer; +}; + +function batch_callback (self, cb, i) { + return function batch_callback (err, res) { + if (err) { + self.results[i] = err; + // Add the position to the error + self.results[i].position = i; + } else { + self.results[i] = res; + } + cb(err, res); + }; +} + +Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function exec_batch (callback) { + var self = this; + var len = self.queue.length; + var index = 0; + var args; + var args_len = 1; + var callback_without_own_cb = function (err, res) { + if (err) { + self.results.push(err); + // Add the position to the error + var i = self.results.length - 1; + self.results[i].position = i; + } else { + self.results.push(res); + } + // Do not emit an error here. Otherwise each error would result in one emit. + // The errors will be returned in the result anyway + }; + var last_callback = function (cb) { + return function (err, res) { + cb(err, res); + callback(null, self.results); + }; + }; + if (len === 0) { + if (callback) { + utils.reply_in_order(self._client, callback, null, []); + } + return true; + } + self.results = []; + self._client.cork(len); + while (args = self.queue.shift()) { + var command = args[0]; + var cb; + args_len = args[1].length - 1; + if (typeof args[2] === 'function') { + cb = batch_callback(self, args[2], index); + } else { + cb = callback_without_own_cb; + } + if (callback && index === len - 1) { + cb = last_callback(cb); + } + self._client.send_command(command, args[1], cb); + index++; + } + self.queue = new Queue(); + self._client.uncork(); + self._client.writeDefault = self._client.writeStrings; + return !self._client.should_buffer; +}; + +module.exports = Multi; diff --git a/lib/utils.js b/lib/utils.js index 8cdfb1f2b5..89d85907fe 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,22 +1,24 @@ 'use strict'; // hgetall converts its replies to an Object. If the reply is empty, null is returned. +// These function are only called with internal data and have therefor always the same instanceof X function replyToObject(reply) { - if (reply.length === 0 || !Array.isArray(reply)) { // TODO: Check why the isArray check is needed and what value reply has in that case + // The reply might be a string or a buffer if this is called in a transaction (multi) + if (reply.length === 0 || !(reply instanceof Array)) { return null; } var obj = {}; - for (var j = 0; j < reply.length; j += 2) { - obj[reply[j].toString('binary')] = reply[j + 1]; + for (var i = 0; i < reply.length; i += 2) { + obj[reply[i].toString('binary')] = reply[i + 1]; } return obj; } function replyToStrings(reply) { - if (Buffer.isBuffer(reply)) { + if (reply instanceof Buffer) { return reply.toString(); } - if (Array.isArray(reply)) { + if (reply instanceof Array) { var res = new Array(reply.length); for (var i = 0; i < reply.length; i++) { // Recusivly call the function as slowlog returns deep nested replies @@ -39,35 +41,68 @@ function print (err, reply) { var redisErrCode = /^([A-Z]+)\s+(.+)$/; // Deep clone arbitrary objects with arrays. Can't handle cyclic structures (results in a range error) +// Any attribute with a non primitive value besides object and array will be passed by reference (e.g. Buffers, Maps, Functions) function clone (obj) { - if (obj) { - var copy; - if (obj.constructor === Array) { - copy = new Array(obj.length); - for (var i = 0; i < obj.length; i++) { - copy[i] = clone(obj[i]); - } - return copy; + var copy; + if (Array.isArray(obj)) { + copy = new Array(obj.length); + for (var i = 0; i < obj.length; i++) { + copy[i] = clone(obj[i]); } - if (obj.constructor === Object) { - copy = {}; - for (var elem in obj) { - if (!obj.hasOwnProperty(elem)) { - // Do not add non own properties to the cloned object - continue; - } - copy[elem] = clone(obj[elem]); - } - return copy; + return copy; + } + if (Object.prototype.toString.call(obj) === '[object Object]') { + copy = {}; + var elems = Object.keys(obj); + var elem; + while (elem = elems.pop()) { + copy[elem] = clone(obj[elem]); } + return copy; } return obj; } +function convenienceClone (obj) { + return clone(obj) || {}; +} + +function callbackOrEmit (self, callback, err, res) { + if (callback) { + callback(err, res); + } else if (err) { + self.emit('error', err); + } +} + +function replyInOrder (self, callback, err, res) { + var command_obj = self.command_queue.peekBack() || self.offline_queue.peekBack(); + if (!command_obj) { + process.nextTick(function () { + callbackOrEmit(self, callback, err, res); + }); + } else { + var tmp = command_obj.callback; + command_obj.callback = tmp ? + function (e, r) { + tmp(e, r); + callbackOrEmit(self, callback, err, res); + } : + function (e, r) { + if (e) { + self.emit('error', e); + } + callbackOrEmit(self, callback, err, res); + }; + } +} + module.exports = { reply_to_strings: replyToStrings, reply_to_object: replyToObject, print: print, err_code: redisErrCode, - clone: clone + clone: convenienceClone, + callback_or_emit: callbackOrEmit, + reply_in_order: replyInOrder };