From c4c8bd04f98c21c60618534981cb5bf137a2ce71 Mon Sep 17 00:00:00 2001 From: Seth Schoen Date: Wed, 8 Aug 2012 13:41:15 -0700 Subject: [PATCH] argh, ugly hack for problem of stale requests poisoning queues --- server-ca/daemon.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/server-ca/daemon.py b/server-ca/daemon.py index 5720fb5bd..16a1a6f95 100644 --- a/server-ca/daemon.py +++ b/server-ca/daemon.py @@ -290,17 +290,27 @@ for message in ps.listen(): populated_queue = message["data"] if populated_queue in dispatch: queue, function = dispatch[populated_queue] - session = r.rpop(queue) - if session: - if debug: print "going to %s for %s" % (populated_queue, session) - if ancient(session, populated_queue): - if populated_queue == "issue": - if debug: print "not expiring issue-state", session + for repetition in (1,2): + # This is a potentially inappropriate hack to prevent backlogs + # from accumulating in queues if daemons die or if spurious + # requests arrived while no daemons were listening. The idea + # is that every daemon process double-checks a queue that it + # was told to look at, processing it twice per pubsub message + # instead of once. This causes a pressure on the daemon to + # empty the queue over time even if it isn't explicitly asked + # to. For example, if asked 7 times to process a particular + # queue, a daemon instance would try 14 times. + session = r.rpop(queue) + if session: + if debug: print "going to %s for %s" % (populated_queue, session) + if ancient(session, populated_queue): + if populated_queue == "issue": + if debug: print "not expiring issue-state", session + else: + if debug: print "expiring ancient session", session + r.hset(session, "live", False) else: - if debug: print "expiring ancient session", session - r.hset(session, "live", False) - else: - function(session) + function(session) elif populated_queue == "clean-exit": pass # fall through to check whether this particular daemon # instance has its clean_shutdown flag set