1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Merge pull request #118 from dvv/master

fix for drain event
This commit is contained in:
Matt Ranney
2011-07-06 07:28:37 -07:00
12 changed files with 283 additions and 1 deletions

View File

@@ -286,7 +286,7 @@ RedisClient.prototype.send_offline_queue = function () {
this.offline_queue = new Queue();
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
if (buffered_writes === 0) {
if (!buffered_writes) {
this.should_buffer = false;
this.emit("drain");
}

16
tests/stress/codec.js Normal file
View File

@@ -0,0 +1,16 @@
var json = {
encode: JSON.stringify,
decode: JSON.parse
};
var MsgPack = require('node-msgpack');
msgpack = {
encode: MsgPack.pack,
decode: function(str) { return MsgPack.unpack(new Buffer(str)); }
};
bison = require('bison');
module.exports = json;
//module.exports = msgpack;
//module.exports = bison;

View File

@@ -0,0 +1,38 @@
'use strict';
var freemem = require('os').freemem;
var profiler = require('v8-profiler');
var codec = require('../codec');
var sent = 0;
var pub = require('redis').createClient(null, null, {
//command_queue_high_water: 5,
//command_queue_low_water: 1
})
.on('ready', function() {
this.emit('drain');
})
.on('drain', function() {
process.nextTick(exec);
});
var payload = '1'; for (var i = 0; i < 12; ++i) payload += payload;
console.log('Message payload length', payload.length);
function exec() {
pub.publish('timeline', codec.encode({ foo: payload }));
++sent;
if (!pub.should_buffer) {
process.nextTick(exec);
}
}
profiler.takeSnapshot('s_0');
exec();
setInterval(function() {
profiler.takeSnapshot('s_' + sent);
console.error('sent', sent, 'free', freemem(), 'cmdqlen', pub.command_queue.length, 'offqlen', pub.offline_queue.length);
}, 2000);

10
tests/stress/pubsub/run Executable file
View File

@@ -0,0 +1,10 @@
#!/bin/sh
node server.js &
node server.js &
node server.js &
node server.js &
node server.js &
node server.js &
node server.js &
node server.js &
node --debug pub.js

View File

@@ -0,0 +1,23 @@
'use strict';
var freemem = require('os').freemem;
var codec = require('../codec');
var id = Math.random();
var recv = 0;
var sub = require('redis').createClient()
.on('ready', function() {
this.subscribe('timeline');
})
.on('message', function(channel, message) {
var self = this;
if (message) {
message = codec.decode(message);
++recv;
}
});
setInterval(function() {
console.error('id', id, 'received', recv, 'free', freemem());
}, 2000);

View File

@@ -0,0 +1,49 @@
'use strict';
var freemem = require('os').freemem;
//var profiler = require('v8-profiler');
var codec = require('../codec');
var sent = 0;
var pub = require('redis').createClient(null, null, {
//command_queue_high_water: 5,
//command_queue_low_water: 1
})
.on('ready', function() {
this.del('timeline');
this.emit('drain');
})
.on('drain', function() {
process.nextTick(exec);
});
var payload = '1'; for (var i = 0; i < 12; ++i) payload += payload;
console.log('Message payload length', payload.length);
function exec() {
pub.rpush('timeline', codec.encode({ foo: payload }));
++sent;
if (!pub.should_buffer) {
process.nextTick(exec);
}
}
//profiler.takeSnapshot('s_0');
exec();
setInterval(function() {
//var ss = profiler.takeSnapshot('s_' + sent);
//console.error(ss.stringify());
pub.llen('timeline', function(err, result) {
console.error('sent', sent, 'free', freemem(),
'cmdqlen', pub.command_queue.length, 'offqlen', pub.offline_queue.length,
'llen', result
);
});
}, 2000);
/*setTimeout(function() {
process.exit();
}, 30000);*/

6
tests/stress/rpushblpop/run Executable file
View File

@@ -0,0 +1,6 @@
#!/bin/sh
node server.js &
#node server.js &
#node server.js &
#node server.js &
node --debug pub.js

View File

@@ -0,0 +1,30 @@
'use strict';
var freemem = require('os').freemem;
var codec = require('../codec');
var id = Math.random();
var recv = 0;
var cmd = require('redis').createClient();
var sub = require('redis').createClient()
.on('ready', function() {
this.emit('timeline');
})
.on('timeline', function() {
var self = this;
this.blpop('timeline', 0, function(err, result) {
var message = result[1];
if (message) {
message = codec.decode(message);
++recv;
}
self.emit('timeline');
});
});
setInterval(function() {
cmd.llen('timeline', function(err, result) {
console.error('id', id, 'received', recv, 'free', freemem(), 'llen', result);
});
}, 2000);

13
tests/stress/speed/00 Normal file
View File

@@ -0,0 +1,13 @@
# size JSON msgpack bison
26602 2151.0170848180414
25542 ? 2842.589272665782
24835 ? ? 7280.4538397469805
6104 6985.234528557929
5045 ? 7217.461392841478
4341 ? ? 14261.406335354604
4180 15864.633685636572
4143 ? 12954.806235781925
4141 ? ? 44650.70733912719
75 114227.07313350472
40 ? 30162.440062810834
39 ? ? 119815.66013519121

13
tests/stress/speed/plot Executable file
View File

@@ -0,0 +1,13 @@
#!/bin/sh
gnuplot >size-rate.jpg << _EOF_
set terminal png nocrop enhanced font verdana 12 size 640,480
set logscale x
set logscale y
set grid
set xlabel 'Serialized object size, octets'
set ylabel 'decode(encode(obj)) rate, 1/sec'
plot '00' using 1:2 title 'json' smooth bezier, '00' using 1:3 title 'msgpack' smooth bezier, '00' using 1:4 title 'bison' smooth bezier
_EOF_

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.5 KiB

View File

@@ -0,0 +1,84 @@
var msgpack = require('node-msgpack');
var bison = require('bison');
var codec = {
JSON: {
encode: JSON.stringify,
decode: JSON.parse
},
msgpack: {
encode: msgpack.pack,
decode: msgpack.unpack
},
bison: bison
};
var obj, l;
var s = '0';
for (var i = 0; i < 12; ++i) s += s;
obj = {
foo: s,
arrrrrr: [{a:1,b:false,c:null,d:1.0}, 1111, 2222, 33333333],
rand: [],
a: s,
ccc: s,
b: s + s + s
};
for (i = 0; i < 100; ++i) obj.rand.push(Math.random());
forObj(obj);
obj = {
foo: s,
arrrrrr: [{a:1,b:false,c:null,d:1.0}, 1111, 2222, 33333333],
rand: []
};
for (i = 0; i < 100; ++i) obj.rand.push(Math.random());
forObj(obj);
obj = {
foo: s,
arrrrrr: [{a:1,b:false,c:null,d:1.0}, 1111, 2222, 33333333],
rand: []
};
forObj(obj);
obj = {
arrrrrr: [{a:1,b:false,c:null,d:1.0}, 1111, 2222, 33333333],
rand: []
};
forObj(obj);
function run(obj, codec) {
var t1 = Date.now();
var n = 10000;
for (var i = 0; i < n; ++i) {
codec.decode(l = codec.encode(obj));
}
var t2 = Date.now();
//console.log('DONE', n*1000/(t2-t1), 'codecs/sec, length=', l.length);
return [n*1000/(t2-t1), l.length];
}
function series(obj, cname, n) {
var rate = 0;
var len = 0;
for (var i = 0; i < n; ++i) {
var r = run(obj, codec[cname]);
rate += r[0];
len += r[1];
}
rate /= n;
len /= n;
console.log(cname + ' ' + rate + ' ' + len);
return [rate, len];
}
function forObj(obj) {
var r = {
JSON: series(obj, 'JSON', 20),
msgpack: series(obj, 'msgpack', 20),
bison: series(obj, 'bison', 20)
};
return r;
}