1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00

Move multi; commands; createClient code into separate files

This commit is contained in:
Ruben Bridgewater
2016-02-22 19:38:07 +01:00
parent ce80569bfe
commit 614e35ab57
7 changed files with 586 additions and 26 deletions

View File

@@ -2,10 +2,10 @@
// This Command constructor is ever so slightly faster than using an object literal, but more importantly, using // This Command constructor is ever so slightly faster than using an object literal, but more importantly, using
// a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots. // a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
function Command(command, args, buffer_args, callback) { function Command(command, args, callback) {
this.command = command; this.command = command;
this.args = args; this.args = args;
this.buffer_args = buffer_args; this.buffer_args = false;
this.callback = callback; this.callback = callback;
} }

86
lib/commands.js Normal file
View File

@@ -0,0 +1,86 @@
'use strict';
var commands = require('redis-commands');
var Multi = require('./multi');
var RedisClient = require('../').RedisClient;
// TODO: Rewrite this including the invidual commands into a Commands class
// that provided a functionality to add new commands to the client
commands.list.forEach(function (command) {
// Do not override existing functions
if (!RedisClient.prototype[command]) {
RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command] = function () {
var arr;
var len = arguments.length;
var callback;
var i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
if (len === 2) {
callback = arguments[1];
}
} else if (len > 1 && Array.isArray(arguments[1])) {
if (len === 3) {
callback = arguments[2];
}
len = arguments[1].length;
arr = new Array(len + 1);
arr[0] = arguments[0];
for (; i < len; i += 1) {
arr[i + 1] = arguments[1][i];
}
} else {
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
return this.send_command(command, arr, callback);
};
}
// Do not override existing functions
if (!Multi.prototype[command]) {
Multi.prototype[command.toUpperCase()] = Multi.prototype[command] = function () {
var arr;
var len = arguments.length;
var callback;
var i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
if (len === 2) {
callback = arguments[1];
}
} else if (len > 1 && Array.isArray(arguments[1])) {
if (len === 3) {
callback = arguments[2];
}
len = arguments[1].length;
arr = new Array(len + 1);
arr[0] = arguments[0];
for (; i < len; i += 1) {
arr[i + 1] = arguments[1][i];
}
} else {
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
this.queue.push([command, arr, callback]);
return this;
};
}
});

67
lib/createClient.js Normal file
View File

@@ -0,0 +1,67 @@
'use strict';
var utils = require('./utils');
var URL = require('url');
module.exports = function createClient (port_arg, host_arg, options) {
if (typeof port_arg === 'number' || typeof port_arg === 'string' && /^\d+$/.test(port_arg)) {
var host;
if (typeof host_arg === 'string') {
host = host_arg;
} else {
options = options || host_arg;
}
options = utils.clone(options);
options.host = host || options.host;
options.port = port_arg;
} else if (typeof port_arg === 'string' || port_arg && port_arg.url) {
options = utils.clone(port_arg.url ? port_arg : host_arg || options);
var parsed = URL.parse(port_arg.url || port_arg, true, true);
// [redis:]//[[user][:password]@][host][:port][/db-number][?db=db-number[&password=bar[&option=value]]]
if (parsed.hostname || parsed.slashes) { // The host might be an empty string
if (parsed.auth) {
options.password = parsed.auth.split(':')[1];
}
if (!/^([a-z]+:)?\/\//i.test(parsed.href)) {
throw new Error('Connection string must use the "redis:" protocol or begin with slashes //');
}
if (parsed.pathname && parsed.pathname !== '/') {
options.db = parsed.pathname.substr(1);
}
options.host = parsed.hostname;
options.port = parsed.port;
if (parsed.search !== '') {
var elem;
for (elem in parsed.query) { // jshint ignore: line
// If options are passed twice, only the parsed options will be used
if (elem in options) {
if (options[elem] === parsed.query[elem]) {
console.warn('node_redis: WARNING: You passed the ' + elem + ' option twice!');
} else {
throw new Error('The ' + elem + ' option is added twice and does not match');
}
}
options[elem] = parsed.query[elem];
}
}
} else {
options.path = port_arg;
}
} else if (typeof port_arg === 'object' || port_arg === undefined) {
options = utils.clone(port_arg || options);
options.host = options.host || host_arg;
}
if (!options) {
throw new Error('Unknown type of connection in createClient()');
}
return options;
};

11
lib/debug.js Normal file
View File

@@ -0,0 +1,11 @@
'use strict';
var index = require('../');
function debug (msg) {
if (index.debug_mode) {
console.error(msg);
}
}
module.exports = debug;

137
lib/individualCommands.js Normal file
View File

@@ -0,0 +1,137 @@
'use strict';
var utils = require('./utils');
var debug = require('./debug');
var Multi = require('./multi');
var no_password_is_set = /no password is set/;
var RedisClient = require('../').RedisClient;
/********************************
Replace built-in redis functions
********************************/
RedisClient.prototype.multi = RedisClient.prototype.MULTI = function multi (args) {
var multi = new Multi(this, args);
multi.exec = multi.EXEC = multi.exec_transaction;
return multi;
};
// ATTENTION: This is not a native function but is still handled as a individual command as it behaves just the same as multi
RedisClient.prototype.batch = RedisClient.prototype.BATCH = function batch (args) {
return new Multi(this, args);
};
// Store db in this.select_db to restore it on reconnect
RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (db, callback) {
var self = this;
return this.send_command('select', [db], function (err, res) {
if (err === null) {
self.selected_db = db;
}
utils.callback_or_emit(self, callback, err, res);
});
};
// Store info in this.server_info after each call
RedisClient.prototype.info = RedisClient.prototype.INFO = function info (callback) {
var self = this;
var ready = this.ready;
this.ready = ready || this.offline_queue.length === 0; // keep the execution order intakt
var tmp = this.send_command('info', [], function (err, res) {
if (res) {
var obj = {};
var lines = res.toString().split('\r\n');
var line, parts, sub_parts;
for (var i = 0; i < lines.length; i++) {
parts = lines[i].split(':');
if (parts[1]) {
if (parts[0].indexOf('db') === 0) {
sub_parts = parts[1].split(',');
obj[parts[0]] = {};
while (line = sub_parts.pop()) {
line = line.split('=');
obj[parts[0]][line[0]] = +line[1];
}
} else {
obj[parts[0]] = parts[1];
}
}
}
obj.versions = [];
/* istanbul ignore else: some redis servers do not send the version */
if (obj.redis_version) {
obj.redis_version.split('.').forEach(function (num) {
obj.versions.push(+num);
});
}
// Expose info key/vals to users
self.server_info = obj;
} else {
self.server_info = {};
}
utils.callback_or_emit(self, callback, err, res);
});
this.ready = ready;
return tmp;
};
RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, callback) {
var self = this;
var ready = this.ready;
debug('Sending auth to ' + self.address + ' id ' + self.connection_id);
// Stash auth for connect and reconnect.
this.auth_pass = pass;
this.ready = this.offline_queue.length === 0; // keep the execution order intakt
var tmp = this.send_command('auth', [pass], function (err, res) {
if (err && no_password_is_set.test(err.message)) {
self.warn('Warning: Redis server does not require a password, but a password was supplied.');
err = null;
res = 'OK';
}
utils.callback_or_emit(self, callback, err, res);
});
this.ready = ready;
return tmp;
};
RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else if (Array.isArray(arguments[1])) {
if (len === 3) {
callback = arguments[2];
}
len = arguments[1].length;
arr = new Array(len + 1);
arr[0] = arguments[0];
for (; i < len; i += 1) {
arr[i + 1] = arguments[1][i];
}
} else if (typeof arguments[1] === 'object' && (arguments.length === 2 || arguments.length === 3 && typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) {
arr = [arguments[0]];
for (var field in arguments[1]) { // jshint ignore: line
arr.push(field, arguments[1][field]);
}
callback = arguments[2];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
return this.send_command('hmset', arr, callback);
};

224
lib/multi.js Normal file
View File

@@ -0,0 +1,224 @@
'use strict';
var Queue = require('double-ended-queue');
var utils = require('./utils');
function Multi(client, args) {
this._client = client;
this.queue = new Queue();
var command, tmp_args;
if (args) { // Either undefined or an array. Fail hard if it's not an array
for (var i = 0; i < args.length; i++) {
command = args[i][0];
tmp_args = args[i].slice(1);
if (Array.isArray(command)) {
this[command[0]].apply(this, command.slice(1).concat(tmp_args));
} else {
this[command].apply(this, tmp_args);
}
}
}
}
Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () {
var arr,
len = 0,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else if (Array.isArray(arguments[1])) {
len = arguments[1].length;
arr = new Array(len + 1);
arr[0] = arguments[0];
for (; i < len; i += 1) {
arr[i + 1] = arguments[1][i];
}
callback = arguments[2];
} else if (typeof arguments[1] === 'object' && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) {
arr = [arguments[0]];
for (var field in arguments[1]) { // jshint ignore: line
arr.push(field, arguments[1][field]);
}
callback = arguments[2];
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
this.queue.push(['hmset', arr, callback]);
return this;
};
function pipeline_transaction_command (self, command, args, index, cb) {
self._client.send_command(command, args, function (err, reply) {
if (err) {
if (cb) {
cb(err);
}
err.position = index;
self.errors.push(err);
}
});
}
Multi.prototype.exec_atomic = function exec_atomic (callback) {
if (this.queue.length < 2) {
return this.exec_batch(callback);
}
return this.exec(callback);
};
function multi_callback (self, err, replies) {
var i = 0, args;
if (err) {
// The errors would be circular
var connection_error = ['CONNECTION_BROKEN', 'UNCERTAIN_STATE'].indexOf(err.code) !== -1;
err.errors = connection_error ? [] : self.errors;
if (self.callback) {
self.callback(err);
// Exclude connection errors so that those errors won't be emitted twice
} else if (!connection_error) {
self._client.emit('error', err);
}
return;
}
if (replies) {
while (args = self.queue.shift()) {
if (replies[i] instanceof Error) {
var match = replies[i].message.match(utils.err_code);
// LUA script could return user errors that don't behave like all other errors!
if (match) {
replies[i].code = match[1];
}
replies[i].command = args[0].toUpperCase();
if (typeof args[2] === 'function') {
args[2](replies[i]);
}
} else {
// If we asked for strings, even in detect_buffers mode, then return strings:
replies[i] = self._client.handle_reply(replies[i], args[0], self.wants_buffers[i]);
if (typeof args[2] === 'function') {
args[2](null, replies[i]);
}
}
i++;
}
}
if (self.callback) {
self.callback(null, replies);
}
}
Multi.prototype.exec_transaction = function exec_transaction (callback) {
var self = this;
var len = self.queue.length;
self.errors = [];
self.callback = callback;
self._client.cork(len + 2);
self.wants_buffers = new Array(len);
pipeline_transaction_command(self, 'multi', []);
// Drain queue, callback will catch 'QUEUED' or error
for (var index = 0; index < len; index++) {
var args = self.queue.get(index);
var command = args[0];
var cb = args[2];
// Keep track of who wants buffer responses:
if (self._client.options.detect_buffers) {
self.wants_buffers[index] = false;
for (var i = 0; i < args[1].length; i += 1) {
if (args[1][i] instanceof Buffer) {
self.wants_buffers[index] = true;
break;
}
}
}
pipeline_transaction_command(self, command, args[1], index, cb);
}
self._client.send_command('exec', [], function(err, replies) {
multi_callback(self, err, replies);
});
self._client.uncork();
self._client.writeDefault = self._client.writeStrings;
return !self._client.should_buffer;
};
function batch_callback (self, cb, i) {
return function batch_callback (err, res) {
if (err) {
self.results[i] = err;
// Add the position to the error
self.results[i].position = i;
} else {
self.results[i] = res;
}
cb(err, res);
};
}
Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function exec_batch (callback) {
var self = this;
var len = self.queue.length;
var index = 0;
var args;
var args_len = 1;
var callback_without_own_cb = function (err, res) {
if (err) {
self.results.push(err);
// Add the position to the error
var i = self.results.length - 1;
self.results[i].position = i;
} else {
self.results.push(res);
}
// Do not emit an error here. Otherwise each error would result in one emit.
// The errors will be returned in the result anyway
};
var last_callback = function (cb) {
return function (err, res) {
cb(err, res);
callback(null, self.results);
};
};
if (len === 0) {
if (callback) {
utils.reply_in_order(self._client, callback, null, []);
}
return true;
}
self.results = [];
self._client.cork(len);
while (args = self.queue.shift()) {
var command = args[0];
var cb;
args_len = args[1].length - 1;
if (typeof args[2] === 'function') {
cb = batch_callback(self, args[2], index);
} else {
cb = callback_without_own_cb;
}
if (callback && index === len - 1) {
cb = last_callback(cb);
}
self._client.send_command(command, args[1], cb);
index++;
}
self.queue = new Queue();
self._client.uncork();
self._client.writeDefault = self._client.writeStrings;
return !self._client.should_buffer;
};
module.exports = Multi;

View File

@@ -1,22 +1,24 @@
'use strict'; 'use strict';
// hgetall converts its replies to an Object. If the reply is empty, null is returned. // hgetall converts its replies to an Object. If the reply is empty, null is returned.
// These function are only called with internal data and have therefor always the same instanceof X
function replyToObject(reply) { function replyToObject(reply) {
if (reply.length === 0 || !Array.isArray(reply)) { // TODO: Check why the isArray check is needed and what value reply has in that case // The reply might be a string or a buffer if this is called in a transaction (multi)
if (reply.length === 0 || !(reply instanceof Array)) {
return null; return null;
} }
var obj = {}; var obj = {};
for (var j = 0; j < reply.length; j += 2) { for (var i = 0; i < reply.length; i += 2) {
obj[reply[j].toString('binary')] = reply[j + 1]; obj[reply[i].toString('binary')] = reply[i + 1];
} }
return obj; return obj;
} }
function replyToStrings(reply) { function replyToStrings(reply) {
if (Buffer.isBuffer(reply)) { if (reply instanceof Buffer) {
return reply.toString(); return reply.toString();
} }
if (Array.isArray(reply)) { if (reply instanceof Array) {
var res = new Array(reply.length); var res = new Array(reply.length);
for (var i = 0; i < reply.length; i++) { for (var i = 0; i < reply.length; i++) {
// Recusivly call the function as slowlog returns deep nested replies // Recusivly call the function as slowlog returns deep nested replies
@@ -39,35 +41,68 @@ function print (err, reply) {
var redisErrCode = /^([A-Z]+)\s+(.+)$/; var redisErrCode = /^([A-Z]+)\s+(.+)$/;
// Deep clone arbitrary objects with arrays. Can't handle cyclic structures (results in a range error) // Deep clone arbitrary objects with arrays. Can't handle cyclic structures (results in a range error)
// Any attribute with a non primitive value besides object and array will be passed by reference (e.g. Buffers, Maps, Functions)
function clone (obj) { function clone (obj) {
if (obj) {
var copy; var copy;
if (obj.constructor === Array) { if (Array.isArray(obj)) {
copy = new Array(obj.length); copy = new Array(obj.length);
for (var i = 0; i < obj.length; i++) { for (var i = 0; i < obj.length; i++) {
copy[i] = clone(obj[i]); copy[i] = clone(obj[i]);
} }
return copy; return copy;
} }
if (obj.constructor === Object) { if (Object.prototype.toString.call(obj) === '[object Object]') {
copy = {}; copy = {};
for (var elem in obj) { var elems = Object.keys(obj);
if (!obj.hasOwnProperty(elem)) { var elem;
// Do not add non own properties to the cloned object while (elem = elems.pop()) {
continue;
}
copy[elem] = clone(obj[elem]); copy[elem] = clone(obj[elem]);
} }
return copy; return copy;
} }
}
return obj; return obj;
} }
function convenienceClone (obj) {
return clone(obj) || {};
}
function callbackOrEmit (self, callback, err, res) {
if (callback) {
callback(err, res);
} else if (err) {
self.emit('error', err);
}
}
function replyInOrder (self, callback, err, res) {
var command_obj = self.command_queue.peekBack() || self.offline_queue.peekBack();
if (!command_obj) {
process.nextTick(function () {
callbackOrEmit(self, callback, err, res);
});
} else {
var tmp = command_obj.callback;
command_obj.callback = tmp ?
function (e, r) {
tmp(e, r);
callbackOrEmit(self, callback, err, res);
} :
function (e, r) {
if (e) {
self.emit('error', e);
}
callbackOrEmit(self, callback, err, res);
};
}
}
module.exports = { module.exports = {
reply_to_strings: replyToStrings, reply_to_strings: replyToStrings,
reply_to_object: replyToObject, reply_to_object: replyToObject,
print: print, print: print,
err_code: redisErrCode, err_code: redisErrCode,
clone: clone clone: convenienceClone,
callback_or_emit: callbackOrEmit,
reply_in_order: replyInOrder
}; };