From cb5922edd8a8eb88d07db5b72b0e8070d525f25e Mon Sep 17 00:00:00 2001 From: Seth Schoen Date: Fri, 20 Jul 2012 16:48:10 -0700 Subject: [PATCH] switch to pubsub mechanism instead of polling --- server-ca/chocolate.py | 1 + server-ca/daemon.py | 78 ++++++++++++++++++++++++++++-------------- 2 files changed, 53 insertions(+), 26 deletions(-) diff --git a/server-ca/chocolate.py b/server-ca/chocolate.py index 18eb4ab16..7aa736279 100755 --- a/server-ca/chocolate.py +++ b/server-ca/chocolate.py @@ -117,6 +117,7 @@ class session(object): for name in names: sessions.rpush(self.id + ":names", name) sessions.hset(self.id, "state", "makechallenge") sessions.lpush("pending-makechallenge", self.id) + sessions.publish("requests", "makechallenge") return True def challenges(self): diff --git a/server-ca/daemon.py b/server-ca/daemon.py index c64d01097..5cf83dd21 100644 --- a/server-ca/daemon.py +++ b/server-ca/daemon.py @@ -53,6 +53,7 @@ from sni_challenge.verify import verify_challenge from Crypto import Random r = redis.Redis() +ps = r.pubsub() issue_lock = redis_lock.redis_lock(r, "issue_lock") # This lock guards the ability to issue certificates with "openssl ca", # which has no locking of its own. We don't need locking for the updates @@ -139,8 +140,12 @@ def makechallenge(session): if True: # challenges have been created r.hset(session, "state", "testchallenge") r.lpush("pending-testchallenge", session) + # TODO: this causes the daemon to immediately attempt to test the + # challenge for completion, with no delay. + r.publish("requests", "testchallenge") else: r.lpush("pending-makechallenge", session) + r.publish("requests", "makechallenge") def testchallenge(session): if r.hget(session, "live") != "True": @@ -207,10 +212,17 @@ def testchallenge(session): if debug: print "\tall satisfied, going to issue", session r.hset(session, "state", "issue") r.lpush("pending-issue", session) + r.publish("requests", "issue") else: # Some challenges are not verified. # Put this session back on the stack to try to verify again. r.lpush("pending-testchallenge", session) + # TODO: if we wanted the client to tell us when it believes + # it has completed the challenge, we should take this out and + # have the server publish the message in response to the message + # from the client. Also, the current version will cause the + # server to retest over and over again as fast as it's able. + r.publish("requests", "testchallenge") def issue(session): if r.hget(session, "live") != "True": @@ -247,36 +259,50 @@ def issue(session): if debug: print "issued for", session r.hset(session, "state", "done") r.lpush("pending-done", session) + # TODO: Note that we do not publish a pubsub message when + # the session enters done state, so the daemon will not + # actually act on it. Is that OK? else: # should not be reached in deployed version if debug: print "issuing for", session, "failed" r.lpush("pending-issue", session) + r.publish("requests", "issue") -while True: +# Dispatch table for how to react to pubsub messages. The key is +# the pubsub message and the value is a tuple of (queue name, function). +# The main loop will look in the specified queue for a pending session, +# and, if it finds one, it will call the specified function on it. +# Since the queue names are systematically related to the message names, +# we could probably remove the queue name field entirely. +dispatch = { "makechallenge": ("pending-makechallenge", makechallenge), + "testchallenge": ("pending-testchallenge", testchallenge), + "issue": ("pending-issue", issue), + "done": ("pending-done", lambda x: None) } + +# Main loop: act on queues notified via Redis pubsub mechanism. +# If the queue is empty by the time we pop from it (indicated by +# session is None), some other daemon instance has already handled +# the request, which is fine; we then return immediately to waiting +# for the next request. + +ps.subscribe(["requests"]) +for message in ps.listen(): + populated_queue = message["data"] + if populated_queue in dispatch: + queue, function = dispatch[populated_queue] + session = r.rpop(queue) + if session: + if debug: print "going to %s for %s" % (populated_queue, session) + if ancient(session, populated_queue): + if populated_queue == "issue": + if debug: print "not expiring issue-state", session + else: + if debug: print "expiring ancient session", session + r.hset(session, "live", False) + else: + function(session) + else: + if debug: print "UNKNOWN queue %s" % populated_queue + if clean_shutdown: print "daemon exiting cleanly" break - session = r.rpop("pending-makechallenge") - if session: - if debug: print "going to makechallenge for", session - if ancient(session, "makechallenge"): - if debug: print "expiring old session", session - r.hset(session, "live", False) - makechallenge(session) - session = None - else: session = r.rpop("pending-testchallenge") - if session: - if debug: print "going to testchallenge for", session - if ancient(session, "testchallenge"): - if debug: print "expiring old session", session - r.hset(session, "live", False) - testchallenge(session) - session = None - else: session = r.rpop("pending-issue") - if session: - if debug: print "going to issue for", session - # Currently the daemon will never itself make an unexpired session - # in "issue" state expire. - issue(session) - session = None - else: time.sleep(1) - # This daemon doesn't currently act on pending-done sessions.