1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Refactor command parsing

This commit is contained in:
Ruben Bridgewater
2016-05-27 17:32:42 +02:00
parent 899f9b7fe4
commit 8b6f2dd35e
10 changed files with 106 additions and 123 deletions

View File

@@ -6,8 +6,6 @@ var util = require('util');
var utils = require('./lib/utils'); var utils = require('./lib/utils');
var Queue = require('double-ended-queue'); var Queue = require('double-ended-queue');
var errorClasses = require('./lib/customErrors'); var errorClasses = require('./lib/customErrors');
var Command = require('./lib/command').Command;
var OfflineCommand = require('./lib/command').OfflineCommand;
var EventEmitter = require('events'); var EventEmitter = require('events');
var Parser = require('redis-parser'); var Parser = require('redis-parser');
var commands = require('redis-commands'); var commands = require('redis-commands');
@@ -156,6 +154,7 @@ function RedisClient (options, stream) {
this.times_connected = 0; this.times_connected = 0;
this.buffers = options.return_buffers || options.detect_buffers; this.buffers = options.return_buffers || options.detect_buffers;
this.options = options; this.options = options;
this.old_state = {};
this.reply = 'ON'; // Returning replies is the default this.reply = 'ON'; // Returning replies is the default
// Init parser // Init parser
this.reply_parser = create_parser(this); this.reply_parser = create_parser(this);
@@ -443,14 +442,10 @@ RedisClient.prototype.on_ready = function () {
// Restore modal commands from previous connection. The order of the commands is important // Restore modal commands from previous connection. The order of the commands is important
if (this.selected_db !== undefined) { if (this.selected_db !== undefined) {
this.internal_send_command('select', [this.selected_db]); this.select(this.selected_db);
} }
if (this.old_state !== null) { if (this.old_state.monitoring) { // Monitor has to be fired before pub sub commands
this.monitoring = this.old_state.monitoring; this.monitor();
this.pub_sub_mode = this.old_state.pub_sub_mode;
}
if (this.monitoring) { // Monitor has to be fired before pub sub commands
this.internal_send_command('monitor', []); // The state is still set
} }
var callback_count = Object.keys(this.subscription_set).length; var callback_count = Object.keys(this.subscription_set).length;
if (!this.options.disable_resubscribing && callback_count) { if (!this.options.disable_resubscribing && callback_count) {
@@ -466,8 +461,8 @@ RedisClient.prototype.on_ready = function () {
debug('Sending pub/sub on_ready commands'); debug('Sending pub/sub on_ready commands');
for (var key in this.subscription_set) { for (var key in this.subscription_set) {
var command = key.slice(0, key.indexOf('_')); var command = key.slice(0, key.indexOf('_'));
var args = self.subscription_set[key]; var args = this.subscription_set[key];
self.internal_send_command(command, [args], callback); this[command]([args], callback);
} }
this.send_offline_queue(); this.send_offline_queue();
return; return;
@@ -530,7 +525,7 @@ RedisClient.prototype.ready_check = function () {
RedisClient.prototype.send_offline_queue = function () { RedisClient.prototype.send_offline_queue = function () {
for (var command_obj = this.offline_queue.shift(); command_obj; command_obj = this.offline_queue.shift()) { for (var command_obj = this.offline_queue.shift(); command_obj; command_obj = this.offline_queue.shift()) {
debug('Sending offline command: ' + command_obj.command); debug('Sending offline command: ' + command_obj.command);
this.internal_send_command(command_obj.command, command_obj.args, command_obj.callback, command_obj.call_on_write); this.internal_send_command(command_obj);
} }
this.drain(); this.drain();
}; };
@@ -575,8 +570,7 @@ RedisClient.prototype.connection_gone = function (why, error) {
this.pipeline = false; this.pipeline = false;
var state = { var state = {
monitoring: this.monitoring, monitoring: this.monitoring
pub_sub_mode: this.pub_sub_mode
}; };
this.old_state = state; this.old_state = state;
this.monitoring = false; this.monitoring = false;
@@ -834,7 +828,6 @@ RedisClient.prototype.return_reply = function (reply) {
function handle_offline_command (self, command_obj) { function handle_offline_command (self, command_obj) {
var command = command_obj.command; var command = command_obj.command;
var callback = command_obj.callback;
var err, msg; var err, msg;
if (self.closing || !self.enable_offline_queue) { if (self.closing || !self.enable_offline_queue) {
command = command.toUpperCase(); command = command.toUpperCase();
@@ -852,10 +845,10 @@ function handle_offline_command (self, command_obj) {
code: 'NR_CLOSED', code: 'NR_CLOSED',
command: command command: command
}); });
if (command_obj.args && command_obj.args.length) { if (command_obj.args.length) {
err.args = command_obj.args; err.args = command_obj.args;
} }
utils.reply_in_order(self, callback, err); utils.reply_in_order(self, command_obj.callback, err);
} else { } else {
debug('Queueing ' + command + ' for next server connection.'); debug('Queueing ' + command + ' for next server connection.');
self.offline_queue.push(command_obj); self.offline_queue.push(command_obj);
@@ -865,22 +858,23 @@ function handle_offline_command (self, command_obj) {
// Do not call internal_send_command directly, if you are not absolutly certain it handles everything properly // Do not call internal_send_command directly, if you are not absolutly certain it handles everything properly
// e.g. monitor / info does not work with internal_send_command only // e.g. monitor / info does not work with internal_send_command only
RedisClient.prototype.internal_send_command = function (command, args, callback, call_on_write) { RedisClient.prototype.internal_send_command = function (command_obj) {
var arg, prefix_keys, command_obj; var arg, prefix_keys;
var i = 0; var i = 0;
var command_str = ''; var command_str = '';
var args = command_obj.args;
var command = command_obj.command;
var len = args.length; var len = args.length;
var big_data = false; var big_data = false;
var buffer_args = false;
var args_copy = new Array(len); var args_copy = new Array(len);
if (process.domain && callback) { if (process.domain && command_obj.callback) {
callback = process.domain.bind(callback); command_obj.callback = process.domain.bind(command_obj.callback);
} }
if (this.ready === false || this.stream.writable === false) { if (this.ready === false || this.stream.writable === false) {
// Handle offline commands right away // Handle offline commands right away
handle_offline_command(this, new OfflineCommand(command, args, callback, call_on_write)); handle_offline_command(this, command_obj);
return false; // Indicate buffering return false; // Indicate buffering
} }
@@ -905,7 +899,7 @@ RedisClient.prototype.internal_send_command = function (command, args, callback,
args_copy[i] = 'null'; // Backwards compatible :/ args_copy[i] = 'null'; // Backwards compatible :/
} else if (Buffer.isBuffer(args[i])) { } else if (Buffer.isBuffer(args[i])) {
args_copy[i] = args[i]; args_copy[i] = args[i];
buffer_args = true; command_obj.buffer_args = true;
big_data = true; big_data = true;
} else { } else {
this.warn( this.warn(
@@ -927,8 +921,6 @@ RedisClient.prototype.internal_send_command = function (command, args, callback,
args_copy[i] = '' + args[i]; args_copy[i] = '' + args[i];
} }
} }
// Pass the original args to make sure in error cases the original arguments are returned
command_obj = new Command(command, args, buffer_args, callback);
if (this.options.prefix) { if (this.options.prefix) {
prefix_keys = commands.getKeyIndexes(command, args_copy); prefix_keys = commands.getKeyIndexes(command, args_copy);
@@ -967,8 +959,8 @@ RedisClient.prototype.internal_send_command = function (command, args, callback,
debug('send_command: buffer send ' + arg.length + ' bytes'); debug('send_command: buffer send ' + arg.length + ' bytes');
} }
} }
if (call_on_write) { if (command_obj.call_on_write) {
call_on_write(); command_obj.call_on_write();
} }
// Handle `CLIENT REPLY ON|OFF|SKIP` // Handle `CLIENT REPLY ON|OFF|SKIP`
// This has to be checked after call_on_write // This has to be checked after call_on_write
@@ -978,8 +970,8 @@ RedisClient.prototype.internal_send_command = function (command, args, callback,
} else { } else {
// Do not expect a reply // Do not expect a reply
// Does this work in combination with the pub sub mode? // Does this work in combination with the pub sub mode?
if (callback) { if (command_obj.callback) {
utils.reply_in_order(this, callback, null, undefined, this.command_queue); utils.reply_in_order(this, command_obj.callback, null, undefined, this.command_queue);
} }
if (this.reply === 'SKIP') { if (this.reply === 'SKIP') {
this.reply = 'SKIP_ONE_MORE'; this.reply = 'SKIP_ONE_MORE';

View File

@@ -1,22 +1,12 @@
'use strict'; 'use strict';
// This Command constructor is ever so slightly faster than using an object literal, but more importantly, using
// a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
function Command (command, args, buffer_args, callback) {
this.command = command;
this.args = args;
this.buffer_args = buffer_args;
this.callback = callback;
}
function OfflineCommand (command, args, callback, call_on_write) { function Command (command, args, callback, call_on_write) {
this.command = command; this.command = command;
this.args = args; this.args = args;
this.buffer_args = false;
this.callback = callback; this.callback = callback;
this.call_on_write = call_on_write; this.call_on_write = call_on_write;
} }
module.exports = { module.exports = Command;
Command: Command,
OfflineCommand: OfflineCommand
};

View File

@@ -3,6 +3,7 @@
var commands = require('redis-commands'); var commands = require('redis-commands');
var Multi = require('./multi'); var Multi = require('./multi');
var RedisClient = require('../').RedisClient; var RedisClient = require('../').RedisClient;
var Command = require('./command');
// TODO: Rewrite this including the invidual commands into a Commands class // TODO: Rewrite this including the invidual commands into a Commands class
// that provided a functionality to add new commands to the client // that provided a functionality to add new commands to the client
@@ -42,7 +43,7 @@ commands.list.forEach(function (command) {
arr[i] = arguments[i]; arr[i] = arguments[i];
} }
} }
return this.internal_send_command(command, arr, callback); return this.internal_send_command(new Command(command, arr, callback));
}; };
Object.defineProperty(RedisClient.prototype[command], 'name', { Object.defineProperty(RedisClient.prototype[command], 'name', {
value: command value: command
@@ -82,7 +83,7 @@ commands.list.forEach(function (command) {
arr[i] = arguments[i]; arr[i] = arguments[i];
} }
} }
this.queue.push([command, arr, callback]); this.queue.push(new Command(command, arr, callback));
return this; return this;
}; };
Object.defineProperty(Multi.prototype[command], 'name', { Object.defineProperty(Multi.prototype[command], 'name', {

View File

@@ -3,6 +3,7 @@
var utils = require('./utils'); var utils = require('./utils');
var debug = require('./debug'); var debug = require('./debug');
var RedisClient = require('../').RedisClient; var RedisClient = require('../').RedisClient;
var Command = require('./command');
var noop = function () {}; var noop = function () {};
/********************************************** /**********************************************
@@ -36,7 +37,7 @@ RedisClient.prototype.send_command = RedisClient.prototype.sendCommand = functio
// but this might change from time to time and at the moment there's no good way to distinguishe them // but this might change from time to time and at the moment there's no good way to distinguishe them
// from each other, so let's just do it do it this way for the time being // from each other, so let's just do it do it this way for the time being
if (command === 'multi' || typeof this[command] !== 'function') { if (command === 'multi' || typeof this[command] !== 'function') {
return this.internal_send_command(command, args, callback); return this.internal_send_command(new Command(command, args, callback));
} }
if (typeof callback === 'function') { if (typeof callback === 'function') {
args = args.concat([callback]); // Prevent manipulating the input array args = args.concat([callback]); // Prevent manipulating the input array

View File

@@ -3,6 +3,7 @@
var utils = require('./utils'); var utils = require('./utils');
var debug = require('./debug'); var debug = require('./debug');
var Multi = require('./multi'); var Multi = require('./multi');
var Command = require('./command');
var no_password_is_set = /no password is set/; var no_password_is_set = /no password is set/;
var loading = /LOADING/; var loading = /LOADING/;
var RedisClient = require('../').RedisClient; var RedisClient = require('../').RedisClient;
@@ -42,33 +43,34 @@ function select_callback (self, db, callback) {
} }
RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (db, callback) { RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (db, callback) {
return this.internal_send_command('select', [db], select_callback(this, db, callback)); return this.internal_send_command(new Command('select', [db], select_callback(this, db, callback)));
}; };
Multi.prototype.select = Multi.prototype.SELECT = function select (db, callback) { Multi.prototype.select = Multi.prototype.SELECT = function select (db, callback) {
this.queue.push(['select', [db], select_callback(this._client, db, callback)]); this.queue.push(new Command('select', [db], select_callback(this._client, db, callback)));
return this; return this;
}; };
function monitor_callback (self, callback) {
return function (err, res) {
if (err === null) {
self.monitoring = true;
}
utils.callback_or_emit(self, callback, err, res);
};
}
RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function monitor (callback) { RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function monitor (callback) {
// Use a individual command, as this is a special case that does not has to be checked for any other command // Use a individual command, as this is a special case that does not has to be checked for any other command
return this.internal_send_command('monitor', [], monitor_callback(this, callback)); var self = this;
var call_on_write = function () {
// Activating monitor mode has to happen before Redis returned the callback,
// as the client could receive monitoring commands before the callback returned through a race condition
self.monitoring = true;
};
return this.internal_send_command(new Command('monitor', [], callback, call_on_write));
}; };
// Only works with batch, not in a transaction // Only works with batch, not in a transaction
Multi.prototype.monitor = Multi.prototype.MONITOR = function monitor (callback) { Multi.prototype.monitor = Multi.prototype.MONITOR = function monitor (callback) {
// Use a individual command, as this is a special case that does not has to be checked for any other command // Use a individual command, as this is a special case that does not has to be checked for any other command
if (this.exec !== this.exec_transaction) { if (this.exec !== this.exec_transaction) {
this.queue.push(['monitor', [], monitor_callback(this._client, callback)]); var self = this;
var call_on_write = function () {
self._client.monitoring = true;
};
this.queue.push(new Command('monitor', [], callback, call_on_write));
return this; return this;
} }
// Set multi monitoring to indicate the exec that it should abort // Set multi monitoring to indicate the exec that it should abort
@@ -100,7 +102,7 @@ RedisClient.prototype.QUIT = RedisClient.prototype.quit = function quit (callbac
// TODO: Consider this for v.3 // TODO: Consider this for v.3
// Allow the quit command to be fired as soon as possible to prevent it landing in the offline queue. // Allow the quit command to be fired as soon as possible to prevent it landing in the offline queue.
// this.ready = this.offline_queue.length === 0; // this.ready = this.offline_queue.length === 0;
var backpressure_indicator = this.internal_send_command('quit', [], quit_callback(this, callback)); var backpressure_indicator = this.internal_send_command(new Command('quit', [], quit_callback(this, callback)));
// Calling quit should always end the connection, no matter if there's a connection or not // Calling quit should always end the connection, no matter if there's a connection or not
this.closing = true; this.closing = true;
this.ready = false; this.ready = false;
@@ -115,7 +117,7 @@ Multi.prototype.QUIT = Multi.prototype.quit = function quit (callback) {
self.closing = true; self.closing = true;
self.ready = false; self.ready = false;
}; };
this.queue.push(['quit', [], quit_callback(self, callback), call_on_write]); this.queue.push(new Command('quit', [], quit_callback(self, callback), call_on_write));
return this; return this;
}; };
@@ -164,7 +166,7 @@ RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section
} else if (section !== undefined) { } else if (section !== undefined) {
args = Array.isArray(section) ? section : [section]; args = Array.isArray(section) ? section : [section];
} }
return this.internal_send_command('info', args, info_callback(this, callback)); return this.internal_send_command(new Command('info', args, info_callback(this, callback)));
}; };
Multi.prototype.info = Multi.prototype.INFO = function info (section, callback) { Multi.prototype.info = Multi.prototype.INFO = function info (section, callback) {
@@ -174,7 +176,7 @@ Multi.prototype.info = Multi.prototype.INFO = function info (section, callback)
} else if (section !== undefined) { } else if (section !== undefined) {
args = Array.isArray(section) ? section : [section]; args = Array.isArray(section) ? section : [section];
} }
this.queue.push(['info', args, info_callback(this._client, callback)]); this.queue.push(new Command('info', args, info_callback(this._client, callback)));
return this; return this;
}; };
@@ -205,7 +207,7 @@ RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, c
this.auth_pass = pass; this.auth_pass = pass;
var ready = this.ready; var ready = this.ready;
this.ready = ready || this.offline_queue.length === 0; this.ready = ready || this.offline_queue.length === 0;
var tmp = this.internal_send_command('auth', [pass], auth_callback(this, pass, callback)); var tmp = this.internal_send_command(new Command('auth', [pass], auth_callback(this, pass, callback)));
this.ready = ready; this.ready = ready;
return tmp; return tmp;
}; };
@@ -216,7 +218,7 @@ Multi.prototype.auth = Multi.prototype.AUTH = function auth (pass, callback) {
// Stash auth for connect and reconnect. // Stash auth for connect and reconnect.
this.auth_pass = pass; this.auth_pass = pass;
this.queue.push(['auth', [pass], auth_callback(this._client, callback)]); this.queue.push(new Command('auth', [pass], auth_callback(this._client, callback)));
return this; return this;
}; };
@@ -262,7 +264,7 @@ RedisClient.prototype.client = RedisClient.prototype.CLIENT = function client ()
}; };
} }
} }
return this.internal_send_command('client', arr, callback, call_on_write); return this.internal_send_command(new Command('client', arr, callback, call_on_write));
}; };
Multi.prototype.client = Multi.prototype.CLIENT = function client () { Multi.prototype.client = Multi.prototype.CLIENT = function client () {
@@ -307,7 +309,7 @@ Multi.prototype.client = Multi.prototype.CLIENT = function client () {
}; };
} }
} }
this.queue.push(['client', arr, callback, call_on_write]); this.queue.push(new Command('client', arr, callback, call_on_write));
return this; return this;
}; };
@@ -347,7 +349,7 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
arr[i] = arguments[i]; arr[i] = arguments[i];
} }
} }
return this.internal_send_command('hmset', arr, callback); return this.internal_send_command(new Command('hmset', arr, callback));
}; };
Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () { Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () {
@@ -386,7 +388,7 @@ Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () {
arr[i] = arguments[i]; arr[i] = arguments[i];
} }
} }
this.queue.push(['hmset', arr, callback]); this.queue.push(new Command('hmset', arr, callback));
return this; return this;
}; };
@@ -414,7 +416,7 @@ RedisClient.prototype.subscribe = RedisClient.prototype.SUBSCRIBE = function sub
var call_on_write = function () { var call_on_write = function () {
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
}; };
return this.internal_send_command('subscribe', arr, callback, call_on_write); return this.internal_send_command(new Command('subscribe', arr, callback, call_on_write));
}; };
Multi.prototype.subscribe = Multi.prototype.SUBSCRIBE = function subscribe () { Multi.prototype.subscribe = Multi.prototype.SUBSCRIBE = function subscribe () {
@@ -441,7 +443,7 @@ Multi.prototype.subscribe = Multi.prototype.SUBSCRIBE = function subscribe () {
var call_on_write = function () { var call_on_write = function () {
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
}; };
this.queue.push(['subscribe', arr, callback, call_on_write]); this.queue.push(new Command('subscribe', arr, callback, call_on_write));
return this; return this;
}; };
@@ -470,7 +472,7 @@ RedisClient.prototype.unsubscribe = RedisClient.prototype.UNSUBSCRIBE = function
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
}; };
return this.internal_send_command('unsubscribe', arr, callback, call_on_write); return this.internal_send_command(new Command('unsubscribe', arr, callback, call_on_write));
}; };
Multi.prototype.unsubscribe = Multi.prototype.UNSUBSCRIBE = function unsubscribe () { Multi.prototype.unsubscribe = Multi.prototype.UNSUBSCRIBE = function unsubscribe () {
@@ -498,7 +500,7 @@ Multi.prototype.unsubscribe = Multi.prototype.UNSUBSCRIBE = function unsubscribe
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
}; };
this.queue.push(['unsubscribe', arr, callback, call_on_write]); this.queue.push(new Command('unsubscribe', arr, callback, call_on_write));
return this; return this;
}; };
@@ -526,7 +528,7 @@ RedisClient.prototype.psubscribe = RedisClient.prototype.PSUBSCRIBE = function p
var call_on_write = function () { var call_on_write = function () {
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
}; };
return this.internal_send_command('psubscribe', arr, callback, call_on_write); return this.internal_send_command(new Command('psubscribe', arr, callback, call_on_write));
}; };
Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe () { Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe () {
@@ -553,7 +555,7 @@ Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe ()
var call_on_write = function () { var call_on_write = function () {
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
}; };
this.queue.push(['psubscribe', arr, callback, call_on_write]); this.queue.push(new Command('psubscribe', arr, callback, call_on_write));
return this; return this;
}; };
@@ -582,7 +584,7 @@ RedisClient.prototype.punsubscribe = RedisClient.prototype.PUNSUBSCRIBE = functi
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
}; };
return this.internal_send_command('punsubscribe', arr, callback, call_on_write); return this.internal_send_command(new Command('punsubscribe', arr, callback, call_on_write));
}; };
Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscribe () { Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscribe () {
@@ -610,6 +612,6 @@ Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscr
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
}; };
this.queue.push(['punsubscribe', arr, callback, call_on_write]); this.queue.push(new Command('punsubscribe', arr, callback, call_on_write));
return this; return this;
}; };

View File

@@ -2,6 +2,7 @@
var Queue = require('double-ended-queue'); var Queue = require('double-ended-queue');
var utils = require('./utils'); var utils = require('./utils');
var Command = require('./command');
function Multi (client, args) { function Multi (client, args) {
this._client = client; this._client = client;
@@ -20,18 +21,23 @@ function Multi (client, args) {
} }
} }
function pipeline_transaction_command (self, command, args, index, cb, call_on_write) { function pipeline_transaction_command (self, command_obj, index) {
// Queueing is done first, then the commands are executed // Queueing is done first, then the commands are executed
self._client.send_command(command, args, function (err, reply) { var tmp = command_obj.callback;
command_obj.callback = function (err, reply) {
// Ignore the multi command. This is applied by node_redis and the user does not benefit by it // Ignore the multi command. This is applied by node_redis and the user does not benefit by it
if (err && index !== -1) { if (err && index !== -1) {
if (cb) { if (tmp) {
cb(err); tmp(err);
} }
err.position = index; err.position = index;
self.errors.push(err); self.errors.push(err);
} }
}); // Keep track of who wants buffer responses:
// By the time the callback is called the command_obj got the buffer_args attribute attached
self.wants_buffers[index] = command_obj.buffer_args;
};
self._client.internal_send_command(command_obj);
} }
Multi.prototype.exec_atomic = Multi.prototype.EXEC_ATOMIC = Multi.prototype.execAtomic = function exec_atomic (callback) { Multi.prototype.exec_atomic = Multi.prototype.EXEC_ATOMIC = Multi.prototype.execAtomic = function exec_atomic (callback) {
@@ -42,7 +48,7 @@ Multi.prototype.exec_atomic = Multi.prototype.EXEC_ATOMIC = Multi.prototype.exec
}; };
function multi_callback (self, err, replies) { function multi_callback (self, err, replies) {
var i = 0, args; var i = 0, command_obj;
if (err) { if (err) {
err.errors = self.errors; err.errors = self.errors;
@@ -56,22 +62,22 @@ function multi_callback (self, err, replies) {
} }
if (replies) { if (replies) {
while (args = self.queue.shift()) { while (command_obj = self.queue.shift()) {
if (replies[i] instanceof Error) { if (replies[i] instanceof Error) {
var match = replies[i].message.match(utils.err_code); var match = replies[i].message.match(utils.err_code);
// LUA script could return user errors that don't behave like all other errors! // LUA script could return user errors that don't behave like all other errors!
if (match) { if (match) {
replies[i].code = match[1]; replies[i].code = match[1];
} }
replies[i].command = args[0].toUpperCase(); replies[i].command = command_obj.command.toUpperCase();
if (typeof args[2] === 'function') { if (typeof command_obj.callback === 'function') {
args[2](replies[i]); command_obj.callback(replies[i]);
} }
} else { } else {
// If we asked for strings, even in detect_buffers mode, then return strings: // If we asked for strings, even in detect_buffers mode, then return strings:
replies[i] = self._client.handle_reply(replies[i], args[0], self.wants_buffers[i]); replies[i] = self._client.handle_reply(replies[i], command_obj.command, self.wants_buffers[i]);
if (typeof args[2] === 'function') { if (typeof command_obj.callback === 'function') {
args[2](null, replies[i]); command_obj.callback(null, replies[i]);
} }
} }
i++; i++;
@@ -98,30 +104,16 @@ Multi.prototype.exec_transaction = function exec_transaction (callback) {
self.callback = callback; self.callback = callback;
self._client.cork(); self._client.cork();
self.wants_buffers = new Array(len); self.wants_buffers = new Array(len);
pipeline_transaction_command(self, 'multi', [], -1); pipeline_transaction_command(self, new Command('multi', []), -1);
// Drain queue, callback will catch 'QUEUED' or error // Drain queue, callback will catch 'QUEUED' or error
for (var index = 0; index < len; index++) { for (var index = 0; index < len; index++) {
// The commands may not be shifted off, since they are needed in the result handler // The commands may not be shifted off, since they are needed in the result handler
var command_obj = self.queue.get(index); pipeline_transaction_command(self, self.queue.get(index), index);
var command = command_obj[0];
var cb = command_obj[2];
var call_on_write = command_obj.length === 4 ? command_obj[3] : undefined;
// Keep track of who wants buffer responses:
if (self._client.options.detect_buffers) {
self.wants_buffers[index] = false;
for (var i = 0; i < command_obj[1].length; i += 1) {
if (command_obj[1][i] instanceof Buffer) {
self.wants_buffers[index] = true;
break;
}
}
}
pipeline_transaction_command(self, command, command_obj[1], index, cb, call_on_write);
} }
self._client.internal_send_command('exec', [], function (err, replies) { self._client.internal_send_command(new Command('exec', [], function (err, replies) {
multi_callback(self, err, replies); multi_callback(self, err, replies);
}); }));
self._client.uncork(); self._client.uncork();
return !self._client.should_buffer; return !self._client.should_buffer;
}; };
@@ -144,16 +136,17 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
var len = self.queue.length; var len = self.queue.length;
var index = 0; var index = 0;
var command_obj; var command_obj;
if (len === 0) {
utils.reply_in_order(self._client, callback, null, []);
return !self._client.should_buffer;
}
self._client.cork(); self._client.cork();
if (!callback) { if (!callback) {
while (command_obj = self.queue.shift()) { while (command_obj = self.queue.shift()) {
self._client.internal_send_command(command_obj[0], command_obj[1], command_obj[2], (command_obj.length === 4 ? command_obj[3] : undefined)); self._client.internal_send_command(command_obj);
} }
self._client.uncork(); self._client.uncork();
return !self._client.should_buffer; return !self._client.should_buffer;
} else if (len === 0) {
utils.reply_in_order(self._client, callback, null, []);
return !self._client.should_buffer;
} }
var callback_without_own_cb = function (err, res) { var callback_without_own_cb = function (err, res) {
if (err) { if (err) {
@@ -175,18 +168,15 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
}; };
self.results = []; self.results = [];
while (command_obj = self.queue.shift()) { while (command_obj = self.queue.shift()) {
var command = command_obj[0]; if (typeof command_obj.callback === 'function') {
var call_on_write = command_obj.length === 4 ? command_obj[3] : undefined; command_obj.callback = batch_callback(self, command_obj.callback, index);
var cb;
if (typeof command_obj[2] === 'function') {
cb = batch_callback(self, command_obj[2], index);
} else { } else {
cb = callback_without_own_cb; command_obj.callback = callback_without_own_cb;
} }
if (typeof callback === 'function' && index === len - 1) { if (typeof callback === 'function' && index === len - 1) {
cb = last_callback(cb); command_obj.callback = last_callback(command_obj.callback);
} }
this._client.internal_send_command(command, command_obj[1], cb, call_on_write); this._client.internal_send_command(command_obj);
index++; index++;
} }
self._client.uncork(); self._client.uncork();

View File

@@ -56,9 +56,9 @@ describe("The 'info' method", function () {
it('check redis v.2.4 support', function (done) { it('check redis v.2.4 support', function (done) {
var end = helper.callFuncAfter(done, 2); var end = helper.callFuncAfter(done, 2);
client.internal_send_command = function (command, args, callback) { client.internal_send_command = function (command_obj) {
assert.strictEqual(args.length, 0); assert.strictEqual(command_obj.args.length, 0);
assert.strictEqual(command, 'info'); assert.strictEqual(command_obj.command, 'info');
end(); end();
}; };
client.info(); client.info();

View File

@@ -692,6 +692,11 @@ describe("The 'multi' method", function () {
// subscribe => enters subscribe mode and this does not work in combination with exec (the same for psubscribe, unsubscribe...) // subscribe => enters subscribe mode and this does not work in combination with exec (the same for psubscribe, unsubscribe...)
// //
// Make sure send_command is not called
client.send_command = function () {
throw new Error('failed');
};
assert.strictEqual(client.selected_db, undefined); assert.strictEqual(client.selected_db, undefined);
var multi = client.multi(); var multi = client.multi();
multi.select(5, function (err, res) { multi.select(5, function (err, res) {

View File

@@ -325,6 +325,7 @@ describe('The node_redis client', function () {
bclient.blpop('blocking list 2', 5, function (err, value) { bclient.blpop('blocking list 2', 5, function (err, value) {
assert.strictEqual(value[0], 'blocking list 2'); assert.strictEqual(value[0], 'blocking list 2');
assert.strictEqual(value[1], 'initial value'); assert.strictEqual(value[1], 'initial value');
bclient.end(true);
done(err); done(err);
}); });
bclient.once('ready', function () { bclient.once('ready', function () {

View File

@@ -215,6 +215,7 @@ describe('publish/subscribe', function () {
sub.stream.end(); sub.stream.end();
}); });
sub.select(3);
sub.subscribe(channels); sub.subscribe(channels);
sub.on('ready', function (err, results) { sub.on('ready', function (err, results) {