You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-01 16:46:54 +03:00
223 lines
7.0 KiB
JavaScript
223 lines
7.0 KiB
JavaScript
'use strict';
|
|
|
|
var Queue = require('double-ended-queue');
|
|
var utils = require('./utils');
|
|
|
|
function Multi (client, args) {
|
|
this._client = client;
|
|
this.queue = new Queue();
|
|
var command, tmp_args;
|
|
if (args) { // Either undefined or an array. Fail hard if it's not an array
|
|
for (var i = 0; i < args.length; i++) {
|
|
command = args[i][0];
|
|
tmp_args = args[i].slice(1);
|
|
if (Array.isArray(command)) {
|
|
this[command[0]].apply(this, command.slice(1).concat(tmp_args));
|
|
} else {
|
|
this[command].apply(this, tmp_args);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () {
|
|
var arr,
|
|
len = 0,
|
|
callback,
|
|
i = 0;
|
|
if (Array.isArray(arguments[0])) {
|
|
arr = arguments[0];
|
|
callback = arguments[1];
|
|
} else if (Array.isArray(arguments[1])) {
|
|
len = arguments[1].length;
|
|
arr = new Array(len + 1);
|
|
arr[0] = arguments[0];
|
|
for (; i < len; i += 1) {
|
|
arr[i + 1] = arguments[1][i];
|
|
}
|
|
callback = arguments[2];
|
|
} else if (typeof arguments[1] === 'object' && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) {
|
|
arr = [arguments[0]];
|
|
for (var field in arguments[1]) { // jshint ignore: line
|
|
arr.push(field, arguments[1][field]);
|
|
}
|
|
callback = arguments[2];
|
|
} else {
|
|
len = arguments.length;
|
|
// The later should not be the average use case
|
|
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
|
|
len--;
|
|
callback = arguments[len];
|
|
}
|
|
arr = new Array(len);
|
|
for (; i < len; i += 1) {
|
|
arr[i] = arguments[i];
|
|
}
|
|
}
|
|
this.queue.push(['hmset', arr, callback]);
|
|
return this;
|
|
};
|
|
|
|
function pipeline_transaction_command (self, command, args, index, cb) {
|
|
self._client.send_command(command, args, function (err, reply) {
|
|
if (err) {
|
|
if (cb) {
|
|
cb(err);
|
|
}
|
|
err.position = index;
|
|
self.errors.push(err);
|
|
}
|
|
});
|
|
}
|
|
|
|
Multi.prototype.exec_atomic = function exec_atomic (callback) {
|
|
if (this.queue.length < 2) {
|
|
return this.exec_batch(callback);
|
|
}
|
|
return this.exec(callback);
|
|
};
|
|
|
|
function multi_callback (self, err, replies) {
|
|
var i = 0, args;
|
|
|
|
if (err) {
|
|
// The errors would be circular
|
|
var connection_error = ['CONNECTION_BROKEN', 'UNCERTAIN_STATE'].indexOf(err.code) !== -1;
|
|
err.errors = connection_error ? [] : self.errors;
|
|
if (self.callback) {
|
|
self.callback(err);
|
|
// Exclude connection errors so that those errors won't be emitted twice
|
|
} else if (!connection_error) {
|
|
self._client.emit('error', err);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (replies) {
|
|
while (args = self.queue.shift()) {
|
|
if (replies[i] instanceof Error) {
|
|
var match = replies[i].message.match(utils.err_code);
|
|
// LUA script could return user errors that don't behave like all other errors!
|
|
if (match) {
|
|
replies[i].code = match[1];
|
|
}
|
|
replies[i].command = args[0].toUpperCase();
|
|
if (typeof args[2] === 'function') {
|
|
args[2](replies[i]);
|
|
}
|
|
} else {
|
|
// If we asked for strings, even in detect_buffers mode, then return strings:
|
|
replies[i] = self._client.handle_reply(replies[i], args[0], self.wants_buffers[i]);
|
|
if (typeof args[2] === 'function') {
|
|
args[2](null, replies[i]);
|
|
}
|
|
}
|
|
i++;
|
|
}
|
|
}
|
|
|
|
if (self.callback) {
|
|
self.callback(null, replies);
|
|
}
|
|
}
|
|
|
|
Multi.prototype.exec_transaction = function exec_transaction (callback) {
|
|
var self = this;
|
|
var len = self.queue.length;
|
|
self.errors = [];
|
|
self.callback = callback;
|
|
self._client.cork(len + 2);
|
|
self.wants_buffers = new Array(len);
|
|
pipeline_transaction_command(self, 'multi', []);
|
|
// Drain queue, callback will catch 'QUEUED' or error
|
|
for (var index = 0; index < len; index++) {
|
|
var args = self.queue.get(index);
|
|
var command = args[0];
|
|
var cb = args[2];
|
|
// Keep track of who wants buffer responses:
|
|
if (self._client.options.detect_buffers) {
|
|
self.wants_buffers[index] = false;
|
|
for (var i = 0; i < args[1].length; i += 1) {
|
|
if (args[1][i] instanceof Buffer) {
|
|
self.wants_buffers[index] = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
pipeline_transaction_command(self, command, args[1], index, cb);
|
|
}
|
|
|
|
self._client.internal_send_command('exec', [], function (err, replies) {
|
|
multi_callback(self, err, replies);
|
|
});
|
|
self._client.uncork();
|
|
self._client.writeDefault = self._client.writeStrings;
|
|
return !self._client.should_buffer;
|
|
};
|
|
|
|
function batch_callback (self, cb, i) {
|
|
return function batch_callback (err, res) {
|
|
if (err) {
|
|
self.results[i] = err;
|
|
// Add the position to the error
|
|
self.results[i].position = i;
|
|
} else {
|
|
self.results[i] = res;
|
|
}
|
|
cb(err, res);
|
|
};
|
|
}
|
|
|
|
Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function exec_batch (callback) {
|
|
var self = this;
|
|
var len = self.queue.length;
|
|
var index = 0;
|
|
var args;
|
|
var callback_without_own_cb = function (err, res) {
|
|
if (err) {
|
|
self.results.push(err);
|
|
// Add the position to the error
|
|
var i = self.results.length - 1;
|
|
self.results[i].position = i;
|
|
} else {
|
|
self.results.push(res);
|
|
}
|
|
// Do not emit an error here. Otherwise each error would result in one emit.
|
|
// The errors will be returned in the result anyway
|
|
};
|
|
var last_callback = function (cb) {
|
|
return function (err, res) {
|
|
cb(err, res);
|
|
callback(null, self.results);
|
|
};
|
|
};
|
|
if (len === 0) {
|
|
if (callback) {
|
|
utils.reply_in_order(self._client, callback, null, []);
|
|
}
|
|
return true;
|
|
}
|
|
self.results = [];
|
|
self._client.cork(len);
|
|
while (args = self.queue.shift()) {
|
|
var command = args[0];
|
|
var cb;
|
|
if (typeof args[2] === 'function') {
|
|
cb = batch_callback(self, args[2], index);
|
|
} else {
|
|
cb = callback_without_own_cb;
|
|
}
|
|
if (typeof callback === 'function' && index === len - 1) {
|
|
cb = last_callback(cb);
|
|
}
|
|
self._client.internal_send_command(command, args[1], cb);
|
|
index++;
|
|
}
|
|
self.queue = new Queue();
|
|
self._client.uncork();
|
|
self._client.writeDefault = self._client.writeStrings;
|
|
return !self._client.should_buffer;
|
|
};
|
|
|
|
module.exports = Multi;
|