From 9a4e51ee4011ed2fa1ac5d28e1fc6e341987a2ed Mon Sep 17 00:00:00 2001 From: Vladimir Dronnikov Date: Fri, 1 Jul 2011 03:46:50 -0400 Subject: [PATCH] fix for buffered_writes -- now drain event works --- index.js | 2 +- tests/stress/pubsub/pub.js | 38 +++++++++++++++++++++++++++++--------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/index.js b/index.js index 33ea41042b..f4eacd0a63 100644 --- a/index.js +++ b/index.js @@ -286,7 +286,7 @@ RedisClient.prototype.send_offline_queue = function () { this.offline_queue = new Queue(); // Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue - if (buffered_writes === 0) { + if (!buffered_writes) { this.should_buffer = false; this.emit("drain"); } diff --git a/tests/stress/pubsub/pub.js b/tests/stress/pubsub/pub.js index 1a6301632a..536641cdf1 100644 --- a/tests/stress/pubsub/pub.js +++ b/tests/stress/pubsub/pub.js @@ -1,13 +1,33 @@ 'use strict'; +var freemem = require('os').freemem; var codec = require('../codec'); -var pub = require('redis').createClient() - .on('ready', function() { - while (true) { - pub.publish('timeline', codec.encode({ - cmd: Math.random(), - data: Math.random() - })); - } - }); +var sent = 0; + +var pub = require('redis').createClient(null, null, { + command_queue_high_water: 5, + command_queue_low_water: 1 +}) +.on('ready', function() { + this.emit('drain'); +}) +.on('drain', function() { + process.nextTick(exec); +}); + +var payload = '1'; for (var i = 0; i < 10; ++i) payload += payload; + +function exec() { + pub.publish('timeline', codec.encode({ foo: payload })); + ++sent; + if (!pub.should_buffer) { + process.nextTick(exec); + } +} + +exec(); + +setInterval(function() { + console.log('sent', sent, 'free', freemem(), 'cmdqlen', pub.command_queue.length, 'offqlen', pub.offline_queue.length); +}, 1000);