mirror of
https://github.com/certbot/certbot.git
synced 2026-01-26 07:41:33 +03:00
switch to pubsub mechanism instead of polling
This commit is contained in:
@@ -117,6 +117,7 @@ class session(object):
|
||||
for name in names: sessions.rpush(self.id + ":names", name)
|
||||
sessions.hset(self.id, "state", "makechallenge")
|
||||
sessions.lpush("pending-makechallenge", self.id)
|
||||
sessions.publish("requests", "makechallenge")
|
||||
return True
|
||||
|
||||
def challenges(self):
|
||||
|
||||
@@ -53,6 +53,7 @@ from sni_challenge.verify import verify_challenge
|
||||
from Crypto import Random
|
||||
|
||||
r = redis.Redis()
|
||||
ps = r.pubsub()
|
||||
issue_lock = redis_lock.redis_lock(r, "issue_lock")
|
||||
# This lock guards the ability to issue certificates with "openssl ca",
|
||||
# which has no locking of its own. We don't need locking for the updates
|
||||
@@ -139,8 +140,12 @@ def makechallenge(session):
|
||||
if True: # challenges have been created
|
||||
r.hset(session, "state", "testchallenge")
|
||||
r.lpush("pending-testchallenge", session)
|
||||
# TODO: this causes the daemon to immediately attempt to test the
|
||||
# challenge for completion, with no delay.
|
||||
r.publish("requests", "testchallenge")
|
||||
else:
|
||||
r.lpush("pending-makechallenge", session)
|
||||
r.publish("requests", "makechallenge")
|
||||
|
||||
def testchallenge(session):
|
||||
if r.hget(session, "live") != "True":
|
||||
@@ -207,10 +212,17 @@ def testchallenge(session):
|
||||
if debug: print "\tall satisfied, going to issue", session
|
||||
r.hset(session, "state", "issue")
|
||||
r.lpush("pending-issue", session)
|
||||
r.publish("requests", "issue")
|
||||
else:
|
||||
# Some challenges are not verified.
|
||||
# Put this session back on the stack to try to verify again.
|
||||
r.lpush("pending-testchallenge", session)
|
||||
# TODO: if we wanted the client to tell us when it believes
|
||||
# it has completed the challenge, we should take this out and
|
||||
# have the server publish the message in response to the message
|
||||
# from the client. Also, the current version will cause the
|
||||
# server to retest over and over again as fast as it's able.
|
||||
r.publish("requests", "testchallenge")
|
||||
|
||||
def issue(session):
|
||||
if r.hget(session, "live") != "True":
|
||||
@@ -247,36 +259,50 @@ def issue(session):
|
||||
if debug: print "issued for", session
|
||||
r.hset(session, "state", "done")
|
||||
r.lpush("pending-done", session)
|
||||
# TODO: Note that we do not publish a pubsub message when
|
||||
# the session enters done state, so the daemon will not
|
||||
# actually act on it. Is that OK?
|
||||
else: # should not be reached in deployed version
|
||||
if debug: print "issuing for", session, "failed"
|
||||
r.lpush("pending-issue", session)
|
||||
r.publish("requests", "issue")
|
||||
|
||||
while True:
|
||||
# Dispatch table for how to react to pubsub messages. The key is
|
||||
# the pubsub message and the value is a tuple of (queue name, function).
|
||||
# The main loop will look in the specified queue for a pending session,
|
||||
# and, if it finds one, it will call the specified function on it.
|
||||
# Since the queue names are systematically related to the message names,
|
||||
# we could probably remove the queue name field entirely.
|
||||
dispatch = { "makechallenge": ("pending-makechallenge", makechallenge),
|
||||
"testchallenge": ("pending-testchallenge", testchallenge),
|
||||
"issue": ("pending-issue", issue),
|
||||
"done": ("pending-done", lambda x: None) }
|
||||
|
||||
# Main loop: act on queues notified via Redis pubsub mechanism.
|
||||
# If the queue is empty by the time we pop from it (indicated by
|
||||
# session is None), some other daemon instance has already handled
|
||||
# the request, which is fine; we then return immediately to waiting
|
||||
# for the next request.
|
||||
|
||||
ps.subscribe(["requests"])
|
||||
for message in ps.listen():
|
||||
populated_queue = message["data"]
|
||||
if populated_queue in dispatch:
|
||||
queue, function = dispatch[populated_queue]
|
||||
session = r.rpop(queue)
|
||||
if session:
|
||||
if debug: print "going to %s for %s" % (populated_queue, session)
|
||||
if ancient(session, populated_queue):
|
||||
if populated_queue == "issue":
|
||||
if debug: print "not expiring issue-state", session
|
||||
else:
|
||||
if debug: print "expiring ancient session", session
|
||||
r.hset(session, "live", False)
|
||||
else:
|
||||
function(session)
|
||||
else:
|
||||
if debug: print "UNKNOWN queue %s" % populated_queue
|
||||
|
||||
if clean_shutdown:
|
||||
print "daemon exiting cleanly"
|
||||
break
|
||||
session = r.rpop("pending-makechallenge")
|
||||
if session:
|
||||
if debug: print "going to makechallenge for", session
|
||||
if ancient(session, "makechallenge"):
|
||||
if debug: print "expiring old session", session
|
||||
r.hset(session, "live", False)
|
||||
makechallenge(session)
|
||||
session = None
|
||||
else: session = r.rpop("pending-testchallenge")
|
||||
if session:
|
||||
if debug: print "going to testchallenge for", session
|
||||
if ancient(session, "testchallenge"):
|
||||
if debug: print "expiring old session", session
|
||||
r.hset(session, "live", False)
|
||||
testchallenge(session)
|
||||
session = None
|
||||
else: session = r.rpop("pending-issue")
|
||||
if session:
|
||||
if debug: print "going to issue for", session
|
||||
# Currently the daemon will never itself make an unexpired session
|
||||
# in "issue" state expire.
|
||||
issue(session)
|
||||
session = None
|
||||
else: time.sleep(1)
|
||||
# This daemon doesn't currently act on pending-done sessions.
|
||||
|
||||
Reference in New Issue
Block a user