mirror of
https://github.com/certbot/certbot.git
synced 2026-01-26 07:41:33 +03:00
push notification implementation with one-shot lock
This commit is contained in:
@@ -81,6 +81,7 @@ class session(object):
|
||||
if timestamp is None: timestamp = int(time.time())
|
||||
if not self.exists():
|
||||
sessions.hset(self.id, "created", timestamp)
|
||||
sessions.hset(self.id, "lastpoll", 0)
|
||||
sessions.hset(self.id, "live", True)
|
||||
sessions.lpush("active-requests", self.id)
|
||||
else:
|
||||
@@ -101,6 +102,15 @@ class session(object):
|
||||
def age(self):
|
||||
return int(time.time()) - int(sessions.hget(self.id, "created"))
|
||||
|
||||
def poll_age(self):
|
||||
return float(time.time()) - float(sessions.hget(self.id, "lastpoll"))
|
||||
|
||||
def request_test(self):
|
||||
"""Ask a daemon to test challenges."""
|
||||
# TODO: check whether this session is already in pending-testchallenge?
|
||||
sessions.lpush("pending-testchallenge", self.id)
|
||||
sessions.publish("requests", "testchallenge")
|
||||
|
||||
def request_made(self):
|
||||
"""Has there already been a signing request made in this session?"""
|
||||
return sessions.hget(self.id, "state") is not None
|
||||
@@ -310,6 +320,17 @@ class session(object):
|
||||
# If we're in testchallenge, tell the client about the challenges and their
|
||||
# current status.
|
||||
if state == "testchallenge":
|
||||
if m.completedchallenge:
|
||||
try:
|
||||
with redis_lock(sessions, "lock-" + self.id, one_shot=True):
|
||||
if self.poll_age() < poll_interval:
|
||||
# Too recent!
|
||||
pass
|
||||
else:
|
||||
sessions.hset(self.id, "lastpoll", time.time())
|
||||
self.request_test()
|
||||
except KeyError:
|
||||
pass
|
||||
self.send_challenges(m, r)
|
||||
return
|
||||
# If we're in done, tell the client about the successfully issued cert.
|
||||
|
||||
@@ -150,10 +150,6 @@ def makechallenge(session):
|
||||
if debug: print "created new challenge", short(challenge)
|
||||
if True: # challenges have been created
|
||||
r.hset(session, "state", "testchallenge")
|
||||
r.lpush("pending-testchallenge", session)
|
||||
# TODO: this causes the daemon to immediately attempt to test the
|
||||
# challenge for completion, with no delay.
|
||||
r.publish("requests", "testchallenge")
|
||||
else:
|
||||
r.lpush("pending-makechallenge", session)
|
||||
r.publish("requests", "makechallenge")
|
||||
@@ -170,13 +166,8 @@ def testchallenge(session):
|
||||
if debug: print "removing expired session", short(session)
|
||||
r.lrem("pending-requests", session)
|
||||
return
|
||||
# Note that we can push this back into the original queue.
|
||||
# TODO: need to add a way to make sure we don't test the same
|
||||
# session too often.
|
||||
# Conceivably, this could wait until the client announces
|
||||
# that it has completed the challenges. Information about
|
||||
# the client's reporting could be stored in the database.
|
||||
# Then the CA doesn't need to poll prematurely.
|
||||
if r.hget(session, "state") != "testchallenge":
|
||||
return
|
||||
all_satisfied = True
|
||||
for i, name in enumerate(r.lrange("%s:names" % session, 0, -1)):
|
||||
challenge = "%s:%d" % (session, i)
|
||||
@@ -227,14 +218,7 @@ def testchallenge(session):
|
||||
r.publish("requests", "issue")
|
||||
else:
|
||||
# Some challenges are not verified.
|
||||
# Put this session back on the stack to try to verify again.
|
||||
r.lpush("pending-testchallenge", session)
|
||||
# TODO: if we wanted the client to tell us when it believes
|
||||
# it has completed the challenge, we should take this out and
|
||||
# have the server publish the message in response to the message
|
||||
# from the client. Also, the current version will cause the
|
||||
# server to retest over and over again as fast as it's able.
|
||||
r.publish("requests", "testchallenge")
|
||||
pass
|
||||
|
||||
def issue(session):
|
||||
if r.hget(session, "live") != "True":
|
||||
@@ -262,6 +246,8 @@ def issue(session):
|
||||
if debug: print "removing expired (issue-state!?) session", short(session)
|
||||
r.lrem("pending-requests", session)
|
||||
return
|
||||
if r.hget(session, "state") != "issue":
|
||||
return
|
||||
csr = r.hget(session, "csr")
|
||||
names = r.lrange("%s:names" % session, 0, -1)
|
||||
with issue_lock:
|
||||
@@ -322,9 +308,10 @@ for message in ps.listen():
|
||||
if debug: print "expiring ancient session", short(session)
|
||||
r.hset(session, "live", False)
|
||||
else:
|
||||
# if debug: print "going to %s for %s" % (queue, short(session))
|
||||
if queue == "makechallenge": makechallenge(session)
|
||||
elif queue == "testchallenge": testchallenge(session)
|
||||
elif queue == "testchallenge":
|
||||
with redis_lock(r, "lock-" + session):
|
||||
testchallenge(session)
|
||||
elif queue == "issue": issue(session)
|
||||
if inactive:
|
||||
break
|
||||
|
||||
@@ -17,6 +17,10 @@
|
||||
# implemented, only one process succeds in clearing and acquiring a
|
||||
# particular expired lock, even "when multiple clients detected an
|
||||
# expired lock and are trying to release it".
|
||||
#
|
||||
# The optional one_shot parameter causes the attempt to acquire the
|
||||
# lock to instead raise a KeyError exception if someone else is already
|
||||
# holding a valid lock.
|
||||
|
||||
import time, random
|
||||
|
||||
@@ -27,9 +31,10 @@ def valid(t):
|
||||
return float(t) > time.time()
|
||||
|
||||
class redis_lock(object):
|
||||
def __init__(self, redis, lock_name):
|
||||
def __init__(self, redis, lock_name, one_shot=False):
|
||||
self.redis = redis
|
||||
self.lock_name = lock_name
|
||||
self.one_shot = one_shot
|
||||
|
||||
def __enter__(self):
|
||||
while True:
|
||||
@@ -40,6 +45,8 @@ class redis_lock(object):
|
||||
# "C4 sends GET lock.foo to check if the lock expired."
|
||||
existing_lock = self.redis.get(self.lock_name)
|
||||
if (not existing_lock) or valid(existing_lock):
|
||||
if self.one_shot:
|
||||
raise KeyError
|
||||
# "If it is not, it will sleep for some time and retry from
|
||||
# the start."
|
||||
time.sleep(1 + random.random())
|
||||
|
||||
Reference in New Issue
Block a user