You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
Improve pipeline logic and fix #897
This commit is contained in:
123
index.js
123
index.js
@@ -39,18 +39,24 @@ function RedisClient(stream, options) {
|
|||||||
options = JSON.parse(JSON.stringify(options || {}));
|
options = JSON.parse(JSON.stringify(options || {}));
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|
||||||
|
this.pipeline = 0;
|
||||||
|
var cork;
|
||||||
if (!stream.cork) {
|
if (!stream.cork) {
|
||||||
this.pipeline = 0;
|
cork = function (len) {
|
||||||
this.cork = noop;
|
self.pipeline = len;
|
||||||
this.once('ready', function () {
|
self.pipeline_queue = new Queue(len);
|
||||||
self.cork = function (len) {
|
};
|
||||||
self.pipeline = len;
|
this.uncork = noop;
|
||||||
self.pipeline_queue = new Queue(len);
|
} else {
|
||||||
};
|
cork = function (len) {
|
||||||
});
|
self.pipeline = len;
|
||||||
stream.uncork = noop;
|
self.pipeline_queue = new Queue(len);
|
||||||
this.write = this.writeStream;
|
self.stream.cork();
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
this.once('ready', function () {
|
||||||
|
self.cork = cork;
|
||||||
|
});
|
||||||
|
|
||||||
this.stream = stream;
|
this.stream = stream;
|
||||||
this.connection_id = ++connection_id;
|
this.connection_id = ++connection_id;
|
||||||
@@ -131,8 +137,9 @@ RedisClient.prototype.install_stream_listeners = function() {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.cork = function (len) {
|
RedisClient.prototype.cork = noop;
|
||||||
this.stream.cork();
|
RedisClient.prototype.uncork = function () {
|
||||||
|
this.stream.uncork();
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.initialize_retry_vars = function () {
|
RedisClient.prototype.initialize_retry_vars = function () {
|
||||||
@@ -377,7 +384,6 @@ RedisClient.prototype.on_info_cmd = function (err, res) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
var self = this;
|
|
||||||
var obj = {};
|
var obj = {};
|
||||||
var lines = res.toString().split('\r\n');
|
var lines = res.toString().split('\r\n');
|
||||||
var i = 0;
|
var i = 0;
|
||||||
@@ -422,9 +428,9 @@ RedisClient.prototype.on_info_cmd = function (err, res) {
|
|||||||
retry_time = 1000;
|
retry_time = 1000;
|
||||||
}
|
}
|
||||||
debug('Redis server still loading, trying again in ' + retry_time);
|
debug('Redis server still loading, trying again in ' + retry_time);
|
||||||
setTimeout(function () {
|
setTimeout(function (self) {
|
||||||
self.ready_check();
|
self.ready_check();
|
||||||
}, retry_time);
|
}, retry_time, this);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -441,12 +447,13 @@ RedisClient.prototype.ready_check = function () {
|
|||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.send_offline_queue = function () {
|
RedisClient.prototype.send_offline_queue = function () {
|
||||||
var command_obj, buffered_writes = 0;
|
var command_obj;
|
||||||
|
|
||||||
while (command_obj = this.offline_queue.shift()) {
|
while (command_obj = this.offline_queue.shift()) {
|
||||||
debug('Sending offline command: ' + command_obj.command);
|
debug('Sending offline command: ' + command_obj.command);
|
||||||
buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback);
|
this.send_command(command_obj.command, command_obj.args, command_obj.callback);
|
||||||
}
|
}
|
||||||
|
this.drain();
|
||||||
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
|
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
|
||||||
this.offline_queue = new Queue();
|
this.offline_queue = new Queue();
|
||||||
};
|
};
|
||||||
@@ -543,7 +550,7 @@ RedisClient.prototype.return_error = function (err) {
|
|||||||
err.code = match[1];
|
err.code = match[1];
|
||||||
}
|
}
|
||||||
|
|
||||||
this.emit_drain_idle(queue_len);
|
this.emit_idle(queue_len);
|
||||||
|
|
||||||
if (command_obj.callback) {
|
if (command_obj.callback) {
|
||||||
command_obj.callback(err);
|
command_obj.callback(err);
|
||||||
@@ -557,16 +564,12 @@ RedisClient.prototype.drain = function () {
|
|||||||
this.should_buffer = false;
|
this.should_buffer = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.emit_drain_idle = function (queue_len) {
|
RedisClient.prototype.emit_idle = function (queue_len) {
|
||||||
if (this.pub_sub_mode === false && queue_len === 0) {
|
if (this.pub_sub_mode === false && queue_len === 0) {
|
||||||
// Free the queue capacity memory by using a new queue
|
// Free the queue capacity memory by using a new queue
|
||||||
this.command_queue = new Queue();
|
this.command_queue = new Queue();
|
||||||
this.emit('idle');
|
this.emit('idle');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.should_buffer && queue_len <= this.command_queue_low_water) {
|
|
||||||
this.drain();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.return_reply = function (reply) {
|
RedisClient.prototype.return_reply = function (reply) {
|
||||||
@@ -587,7 +590,7 @@ RedisClient.prototype.return_reply = function (reply) {
|
|||||||
|
|
||||||
queue_len = this.command_queue.length;
|
queue_len = this.command_queue.length;
|
||||||
|
|
||||||
this.emit_drain_idle(queue_len);
|
this.emit_idle(queue_len);
|
||||||
|
|
||||||
if (command_obj && !command_obj.sub_command) {
|
if (command_obj && !command_obj.sub_command) {
|
||||||
if (typeof command_obj.callback === 'function') {
|
if (typeof command_obj.callback === 'function') {
|
||||||
@@ -640,7 +643,7 @@ RedisClient.prototype.return_reply = function (reply) {
|
|||||||
}
|
}
|
||||||
/* istanbul ignore else: this is a safety check that we should not be able to trigger */
|
/* istanbul ignore else: this is a safety check that we should not be able to trigger */
|
||||||
else if (this.monitoring) {
|
else if (this.monitoring) {
|
||||||
if (Buffer.isBuffer(reply)) {
|
if (typeof reply !== 'string') {
|
||||||
reply = reply.toString();
|
reply = reply.toString();
|
||||||
}
|
}
|
||||||
// If in monitoring mode only two commands are valid ones: AUTH and MONITOR wich reply with OK
|
// If in monitoring mode only two commands are valid ones: AUTH and MONITOR wich reply with OK
|
||||||
@@ -662,8 +665,8 @@ RedisClient.prototype.send_command = function (command, args, callback) {
|
|||||||
var arg, command_obj, i, err,
|
var arg, command_obj, i, err,
|
||||||
stream = this.stream,
|
stream = this.stream,
|
||||||
command_str = '',
|
command_str = '',
|
||||||
buffered_writes = 0,
|
|
||||||
buffer_args = false,
|
buffer_args = false,
|
||||||
|
big_data = false,
|
||||||
buffer = this.options.return_buffers;
|
buffer = this.options.return_buffers;
|
||||||
|
|
||||||
if (args === undefined) {
|
if (args === undefined) {
|
||||||
@@ -695,7 +698,12 @@ RedisClient.prototype.send_command = function (command, args, callback) {
|
|||||||
for (i = 0; i < args.length; i += 1) {
|
for (i = 0; i < args.length; i += 1) {
|
||||||
if (Buffer.isBuffer(args[i])) {
|
if (Buffer.isBuffer(args[i])) {
|
||||||
buffer_args = true;
|
buffer_args = true;
|
||||||
break;
|
} else if (typeof args[i] !== 'string') {
|
||||||
|
arg = String(arg);
|
||||||
|
// 30000 seemed to be a good value to switch to buffers after testing this with and checking the pros and cons
|
||||||
|
} else if (args[i].length > 30000) {
|
||||||
|
big_data = true;
|
||||||
|
args[i] = new Buffer(args[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (this.options.detect_buffers) {
|
if (this.options.detect_buffers) {
|
||||||
@@ -741,74 +749,53 @@ RedisClient.prototype.send_command = function (command, args, callback) {
|
|||||||
|
|
||||||
// Always use 'Multi bulk commands', but if passed any Buffer args, then do multiple writes, one for each arg.
|
// Always use 'Multi bulk commands', but if passed any Buffer args, then do multiple writes, one for each arg.
|
||||||
// This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
|
// This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
|
||||||
|
|
||||||
command_str = '*' + (args.length + 1) + '\r\n$' + command.length + '\r\n' + command + '\r\n';
|
command_str = '*' + (args.length + 1) + '\r\n$' + command.length + '\r\n' + command + '\r\n';
|
||||||
|
|
||||||
if (!buffer_args) { // Build up a string and send entire command in one write
|
if (!buffer_args && !big_data) { // Build up a string and send entire command in one write
|
||||||
for (i = 0; i < args.length; i += 1) {
|
for (i = 0; i < args.length; i += 1) {
|
||||||
arg = args[i];
|
arg = String(args[i]);
|
||||||
if (typeof arg !== 'string') {
|
|
||||||
arg = String(arg);
|
|
||||||
}
|
|
||||||
command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n';
|
command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n';
|
||||||
}
|
}
|
||||||
debug('Send ' + this.address + ' id ' + this.connection_id + ': ' + command_str);
|
debug('Send ' + this.address + ' id ' + this.connection_id + ': ' + command_str);
|
||||||
buffered_writes += !this.write(command_str);
|
this.write(command_str);
|
||||||
} else {
|
} else {
|
||||||
debug('Send command (' + command_str + ') has Buffer arguments');
|
debug('Send command (' + command_str + ') has Buffer arguments');
|
||||||
buffered_writes += !this.write(command_str);
|
this.write(command_str);
|
||||||
|
|
||||||
for (i = 0; i < args.length; i += 1) {
|
for (i = 0; i < args.length; i += 1) {
|
||||||
arg = args[i];
|
arg = args[i];
|
||||||
if (Buffer.isBuffer(arg)) {
|
if (!Buffer.isBuffer(arg)) {
|
||||||
if (arg.length === 0) {
|
arg = String(arg);
|
||||||
debug('send_command: using empty string for 0 length buffer');
|
this.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
|
||||||
buffered_writes += !this.write('$0\r\n\r\n');
|
|
||||||
} else {
|
|
||||||
buffered_writes += !this.write('$' + arg.length + '\r\n');
|
|
||||||
buffered_writes += !this.write(arg);
|
|
||||||
buffered_writes += !this.write('\r\n');
|
|
||||||
debug('send_command: buffer send ' + arg.length + ' bytes');
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if (typeof arg !== 'string') {
|
this.write('$' + arg.length + '\r\n');
|
||||||
arg = String(arg);
|
this.write(arg);
|
||||||
}
|
this.write('\r\n');
|
||||||
debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg);
|
|
||||||
buffered_writes += !this.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
|
|
||||||
}
|
}
|
||||||
|
debug('send_command: buffer send ' + arg.length + ' bytes');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (buffered_writes !== 0 || this.command_queue.length >= this.command_queue_high_water) {
|
|
||||||
debug('send_command buffered_writes: ' + buffered_writes, ' should_buffer: ' + this.should_buffer);
|
|
||||||
this.should_buffer = true;
|
|
||||||
}
|
|
||||||
return !this.should_buffer;
|
return !this.should_buffer;
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.write = function (data) {
|
RedisClient.prototype.write = function (data) {
|
||||||
return this.stream.write(data);
|
|
||||||
};
|
|
||||||
|
|
||||||
RedisClient.prototype.writeStream = function (data) {
|
|
||||||
var nr = 0;
|
|
||||||
|
|
||||||
if (this.pipeline === 0) {
|
if (this.pipeline === 0) {
|
||||||
return this.stream.write(data);
|
this.should_buffer = !this.stream.write(data);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.pipeline--;
|
this.pipeline--;
|
||||||
if (this.pipeline === 0) {
|
if (this.pipeline === 0) {
|
||||||
var command;
|
var command, str = '';
|
||||||
while (command = this.pipeline_queue.shift()) {
|
while (command = this.pipeline_queue.shift()) {
|
||||||
nr += !this.stream.write(command);
|
str += command;
|
||||||
}
|
}
|
||||||
nr += !this.stream.write(data);
|
this.should_buffer = !this.stream.write(str + data);
|
||||||
return !nr;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.pipeline_queue.push(data);
|
this.pipeline_queue.push(data);
|
||||||
return true;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisClient.prototype.pub_sub_command = function (command_obj) {
|
RedisClient.prototype.pub_sub_command = function (command_obj) {
|
||||||
@@ -1102,7 +1089,7 @@ Multi.prototype.exec_transaction = function (callback) {
|
|||||||
this.send_command(command, args, index, cb);
|
this.send_command(command, args, index, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
this._client.stream.uncork();
|
this._client.uncork();
|
||||||
return this._client.send_command('exec', [], function(err, replies) {
|
return this._client.send_command('exec', [], function(err, replies) {
|
||||||
self.execute_callback(err, replies);
|
self.execute_callback(err, replies);
|
||||||
});
|
});
|
||||||
@@ -1210,7 +1197,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
|
|||||||
this._client.send_command(command, args, cb);
|
this._client.send_command(command, args, cb);
|
||||||
index++;
|
index++;
|
||||||
}
|
}
|
||||||
this._client.stream.uncork();
|
this._client.uncork();
|
||||||
return this._client.should_buffer;
|
return this._client.should_buffer;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -162,7 +162,7 @@ describe("The 'batch' method", function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('handles batchple operations being applied to a set', function (done) {
|
it('handles multiple operations being applied to a set', function (done) {
|
||||||
client.sadd("some set", "mem 1");
|
client.sadd("some set", "mem 1");
|
||||||
client.sadd(["some set", "mem 2"]);
|
client.sadd(["some set", "mem 2"]);
|
||||||
client.sadd("some set", "mem 3");
|
client.sadd("some set", "mem 3");
|
||||||
@@ -189,7 +189,7 @@ describe("The 'batch' method", function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('allows batchple operations to be performed using constructor with all kinds of syntax', function (done) {
|
it('allows multiple operations to be performed using constructor with all kinds of syntax', function (done) {
|
||||||
var now = Date.now();
|
var now = Date.now();
|
||||||
var arr = ["batchhmset", "batchbar", "batchbaz"];
|
var arr = ["batchhmset", "batchbar", "batchbaz"];
|
||||||
var arr2 = ['some manner of key', 'otherTypes'];
|
var arr2 = ['some manner of key', 'otherTypes'];
|
||||||
@@ -253,7 +253,7 @@ describe("The 'batch' method", function () {
|
|||||||
assert.strictEqual(buffering, true);
|
assert.strictEqual(buffering, true);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('allows batchple operations to be performed using a chaining API', function (done) {
|
it('allows multiple operations to be performed using a chaining API', function (done) {
|
||||||
client.batch()
|
client.batch()
|
||||||
.mset('some', '10', 'keys', '20')
|
.mset('some', '10', 'keys', '20')
|
||||||
.incr('some')
|
.incr('some')
|
||||||
@@ -270,7 +270,7 @@ describe("The 'batch' method", function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('allows batchple commands to work the same as normal to be performed using a chaining API', function (done) {
|
it('allows multiple commands to work the same as normal to be performed using a chaining API', function (done) {
|
||||||
client.batch()
|
client.batch()
|
||||||
.mset(['some', '10', 'keys', '20'])
|
.mset(['some', '10', 'keys', '20'])
|
||||||
.incr(['some', helper.isNumber(11)])
|
.incr(['some', helper.isNumber(11)])
|
||||||
@@ -287,7 +287,7 @@ describe("The 'batch' method", function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('allows batchple commands to work the same as normal to be performed using a chaining API promisified', function () {
|
it('allows multiple commands to work the same as normal to be performed using a chaining API promisified', function () {
|
||||||
return client.batch()
|
return client.batch()
|
||||||
.mset(['some', '10', 'keys', '20'])
|
.mset(['some', '10', 'keys', '20'])
|
||||||
.incr(['some', helper.isNumber(11)])
|
.incr(['some', helper.isNumber(11)])
|
||||||
@@ -303,7 +303,7 @@ describe("The 'batch' method", function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('allows an array to be provided indicating batchple operations to perform', function (done) {
|
it('allows an array to be provided indicating multiple operations to perform', function (done) {
|
||||||
// test nested batch-bulk replies with nulls.
|
// test nested batch-bulk replies with nulls.
|
||||||
client.batch([
|
client.batch([
|
||||||
["mget", ["batchfoo", "some", "random value", "keys"]],
|
["mget", ["batchfoo", "some", "random value", "keys"]],
|
||||||
|
@@ -43,6 +43,22 @@ describe("The node_redis client", function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('big data', function () {
|
||||||
|
|
||||||
|
// Check if the fast mode for big strings is working correct
|
||||||
|
it('safe strings that are bigger than 30000 characters', function(done) {
|
||||||
|
var str = 'foo ಠ_ಠ bar ';
|
||||||
|
while (str.length < 111111) {
|
||||||
|
str += str;
|
||||||
|
}
|
||||||
|
client.set('foo', str);
|
||||||
|
client.get('foo', function (err, res) {
|
||||||
|
assert.strictEqual(res, str);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe("send_command", function () {
|
describe("send_command", function () {
|
||||||
|
|
||||||
it("omitting args should be fine in some cases", function (done) {
|
it("omitting args should be fine in some cases", function (done) {
|
||||||
@@ -470,20 +486,6 @@ describe("The node_redis client", function () {
|
|||||||
|
|
||||||
describe('enable_offline_queue', function () {
|
describe('enable_offline_queue', function () {
|
||||||
describe('true', function () {
|
describe('true', function () {
|
||||||
it("should emit drain after info command and nothing to buffer", function (done) {
|
|
||||||
client = redis.createClient({
|
|
||||||
parser: parser
|
|
||||||
});
|
|
||||||
client.set('foo', 'bar');
|
|
||||||
client.get('foo', function () {
|
|
||||||
assert(!client.should_buffer);
|
|
||||||
setTimeout(done, 25);
|
|
||||||
});
|
|
||||||
client.on('drain', function() {
|
|
||||||
assert(client.offline_queue.length === 2);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should emit drain if offline queue is flushed and nothing to buffer", function (done) {
|
it("should emit drain if offline queue is flushed and nothing to buffer", function (done) {
|
||||||
client = redis.createClient({
|
client = redis.createClient({
|
||||||
parser: parser,
|
parser: parser,
|
||||||
|
Reference in New Issue
Block a user