1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-21 19:01:07 +03:00

Merge branch 'master' into use_psutil

This commit is contained in:
James Kasten
2015-02-25 18:07:07 -08:00
49 changed files with 2914 additions and 1106 deletions

View File

@@ -1,17 +1,19 @@
language: python
# please keep this in sync with docs/using.rst (Ubuntu section, apt-get)
before_install: >
travis_retry sudo apt-get install python python-setuptools
python-virtualenv python-dev gcc swig dialog libaugeas0 libssl-dev
libffi-dev ca-certificates
install:
- travis_retry python setup.py dev # installs tox
- travis_retry pip install coveralls
install: "travis_retry pip install tox coveralls"
script: "travis_retry tox"
script: travis_retry tox
after_success: coveralls
after_success: '[ "$TOXENV" == "cover" ] && coveralls'
# using separate envs with different TOXENVs creates 4x1 Travis build
# matrix, which allows us to clearly distinguish which component under
# test has failed
env:
- TOXENV=py26
- TOXENV=py27

72
CONTRIBUTING.rst Normal file
View File

@@ -0,0 +1,72 @@
.. _hacking:
Hacking
=======
In order to start hacking, you will first have to create a development
environment:
::
./venv/bin/python setup.py dev
The code base, including your pull requests, **must** have 100% test statement
coverage **and** be compliant with the :ref:`coding-style`.
The following tools are there to help you:
- ``./venv/bin/tox`` starts a full set of tests. Please make sure you
run it before submitting a new pull request.
- ``./venv/bin/tox -e cover`` checks the test coverage only.
- ``./venv/bin/tox -e lint`` checks the style of the whole project,
while ``./venv/bin/pylint --rcfile=.pylintrc file`` will check a single `file` only.
.. _coding-style:
Coding style
============
Please:
1. **Be consistent with the rest of the code**.
2. Read `PEP 8 - Style Guide for Python Code`_.
3. Follow the `Google Python Style Guide`_, with the exception that we
use `Sphinx-style`_ documentation:
::
def foo(arg):
"""Short description.
:param int arg: Some number.
:returns: Argument
:rtype: int
"""
return arg
4. Remember to use ``./venv/bin/pylint``.
.. _Google Python Style Guide: https://google-styleguide.googlecode.com/svn/trunk/pyguide.html
.. _Sphinx-style: http://sphinx-doc.org/
.. _PEP 8 - Style Guide for Python Code: https://www.python.org/dev/peps/pep-0008
Updating the Documentation
==========================
In order to generate the Sphinx documentation, run the following commands.
::
cd docs
make clean html SPHINXBUILD=../venv/bin/sphinx-build
This should generate documentation in the ``docs/_build/html`` directory.

View File

@@ -1,6 +1,8 @@
About the Let's Encrypt Client
==============================
|build-status| |coverage| |docs|
In short: getting and installing SSL/TLS certificates made easy (`watch demo video`_).
The Let's Encrypt Client is a tool to automatically receive and install
@@ -25,10 +27,17 @@ All you need to do is:
**Encrypt ALL the things!**
.. image:: https://travis-ci.org/letsencrypt/lets-encrypt-preview.svg?branch=master
:target: https://travis-ci.org/letsencrypt/lets-encrypt-preview
.. image:: https://coveralls.io/repos/letsencrypt/lets-encrypt-preview/badge.svg?branch=master
:target: https://coveralls.io/r/letsencrypt/lets-encrypt-preview
.. |build-status| image:: https://travis-ci.org/letsencrypt/lets-encrypt-preview.svg?branch=master
:target: https://travis-ci.org/letsencrypt/lets-encrypt-preview
:alt: Travis CI status
.. |coverage| image:: https://coveralls.io/repos/letsencrypt/lets-encrypt-preview/badge.svg?branch=master
:target: https://coveralls.io/r/letsencrypt/lets-encrypt-preview
:alt: Coverage status
.. |docs| image:: https://readthedocs.org/projects/letsencrypt/badge/
:target: https://readthedocs.org/projects/letsencrypt/
:alt: Documentation status
.. _watch demo video: https://www.youtube.com/watch?v=Gas_sSB-5SU

View File

@@ -3,3 +3,27 @@
.. automodule:: letsencrypt.client.display
:members:
:mod:`letsencrypt.client.display.util`
======================================
.. automodule:: letsencrypt.client.display.util
:members:
:mod:`letsencrypt.client.display.ops`
=====================================
.. automodule:: letsencrypt.client.display.ops
:members:
:mod:`letsencrypt.client.display.enhancements`
==============================================
.. automodule:: letsencrypt.client.display.enhancements
:members:
:mod:`letsencrypt.client.display.revocation`
============================================
.. automodule:: letsencrypt.client.display.revocation
:members:

View File

@@ -17,6 +17,14 @@ import os
import re
import sys
import mock
# http://docs.readthedocs.org/en/latest/faq.html#i-get-import-errors-on-libraries-that-depend-on-c-modules
# c.f. #262
sys.modules.update(
(mod_name, mock.MagicMock()) for mod_name in ['augeas', 'M2Crypto'])
here = os.path.abspath(os.path.dirname(__file__))
# read version number (and other metadata) from package init
@@ -117,7 +125,15 @@ pygments_style = 'sphinx'
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
# http://docs.readthedocs.org/en/latest/theme.html#how-do-i-use-this-locally-and-on-read-the-docs
# on_rtd is whether we are on readthedocs.org
on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
if not on_rtd: # only import and set the theme if we're building docs locally
import sphinx_rtd_theme
html_theme = 'sphinx_rtd_theme'
html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
# otherwise, readthedocs.org uses their theme by default, so no need to specify it
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the

View File

@@ -2,75 +2,4 @@
The Let's Encrypt Client Project
================================
.. _hacking:
Hacking
=======
In order to start hacking, you will first have to create a development
environment:
::
./venv/bin/python setup.py dev
The code base, including your pull requests, **must** have 100% test statement
coverage **and** be compliant with the :ref:`coding-style`.
The following tools are there to help you:
- ``./venv/bin/tox`` starts a full set of tests. Please make sure you
run it before submitting a new pull request.
- ``./venv/bin/tox -e cover`` checks the test coverage only.
- ``./venv/bin/tox -e lint`` checks the style of the whole project,
while ``./venv/bin/pylint --rcfile=.pylintrc file`` will check a single `file` only.
.. _coding-style:
Coding style
============
Please:
1. **Be consistent with the rest of the code**.
2. Read `PEP 8 - Style Guide for Python Code`_.
3. Follow the `Google Python Style Guide`_, with the exception that we
use `Sphinx-style`_ documentation:
::
def foo(arg):
"""Short description.
:param int arg: Some number.
:returns: Argument
:rtype: int
"""
return arg
4. Remember to use ``./venv/bin/pylint``.
.. _Google Python Style Guide: https://google-styleguide.googlecode.com/svn/trunk/pyguide.html
.. _Sphinx-style: http://sphinx-doc.org/
.. _PEP 8 - Style Guide for Python Code: https://www.python.org/dev/peps/pep-0008
Updating the Documentation
==========================
In order to generate the Sphinx documentation, run the following commands.
::
cd docs
make clean html SPHINXBUILD=../venv/bin/sphinx-build
This should generate documentation in the ``docs/_build/html`` directory.
.. include:: ../CONTRIBUTING.rst

View File

@@ -27,6 +27,7 @@ Ubuntu
gcc swig dialog libaugeas0 libssl-dev libffi-dev \
ca-certificates
.. Please keep the above command in sync with .travis.yml (before_install)
Mac OSX
-------
@@ -53,7 +54,7 @@ The letsencrypt commandline tool has a builtin help:
::
letsencrypt --help
./venv/bin/letsencrypt --help
.. _augeas: http://augeas.net/

View File

@@ -65,6 +65,9 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
:ivar config: Configuration.
:type config: :class:`~letsencrypt.client.interfaces.IConfig`
:ivar parser: Handles low level parsing
:type parser: :class:`letsencrypt.client.apache.parser`
:ivar tup version: version of Apache
:ivar list vhosts: All vhosts found in the configuration
(:class:`list` of :class:`letsencrypt.client.apache.obj.VirtualHost`)
@@ -74,6 +77,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
zope.interface.implements(interfaces.IAuthenticator, interfaces.IInstaller)
description = "Apache Web Server"
def __init__(self, config, version=None):
"""Initialize an Apache Configurator.
@@ -87,6 +92,19 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if os.geteuid() == 0:
self.verify_setup()
# Add name_server association dict
self.assoc = dict()
# Add number of outstanding challenges
self._chall_out = 0
# These will be set in the prepare function
self.parser = None
self.version = version
self.vhosts = None
self._enhance_func = {"redirect": self._enable_redirect}
def prepare(self):
"""Prepare the authenticator/installer."""
self.parser = parser.ApacheParser(
self.aug, self.config.apache_server_root,
self.config.apache_mod_ssl_conf)
@@ -94,14 +112,11 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
self.check_parsing_errors("httpd.aug")
# Set Version
self.version = self.get_version() if version is None else version
if self.version is None:
self.version = self.get_version()
# Get all of the available vhosts
self.vhosts = self.get_virtual_hosts()
# Add name_server association dict
self.assoc = dict()
# Add number of outstanding challenges
self.chall_out = 0
# Enable mod_ssl if it isn't already enabled
# This is Let's Encrypt... we enable mod_ssl on initialization :)
@@ -110,7 +125,6 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# on initialization
self._prepare_server_https()
self.enhance_func = {"redirect": self._enable_redirect}
temp_install(self.config.apache_mod_ssl_conf)
def deploy_cert(self, domain, cert, key, cert_chain=None):
@@ -521,7 +535,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
try:
return self.enhance_func[enhancement](
return self._enhance_func[enhancement](
self.choose_vhost(domain), options)
except ValueError:
raise errors.LetsEncryptConfiguratorError(
@@ -898,7 +912,20 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
return True
def get_version(self): # pylint: disable=no-self-use
def verify_setup(self):
"""Verify the setup to ensure safe operating environment.
Make sure that files/directories are setup with appropriate permissions
Aim for defensive coding... make sure all input files
have permissions of root
"""
uid = os.geteuid()
le_util.make_or_verify_dir(self.config.config_dir, 0o755, uid)
le_util.make_or_verify_dir(self.config.work_dir, 0o755, uid)
le_util.make_or_verify_dir(self.config.backup_dir, 0o755, uid)
def get_version(self):
"""Return version of Apache Server.
Version is returned as tuple. (ie. 2.4.7 = (2, 4, 7))
@@ -929,18 +956,15 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
return tuple([int(i) for i in matches[0].split('.')])
def verify_setup(self):
"""Verify the setup to ensure safe operating environment.
Make sure that files/directories are setup with appropriate permissions
Aim for defensive coding... make sure all input files
have permissions of root
"""
uid = os.geteuid()
le_util.make_or_verify_dir(self.config.config_dir, 0o755, uid)
le_util.make_or_verify_dir(self.config.work_dir, 0o755, uid)
le_util.make_or_verify_dir(self.config.backup_dir, 0o755, uid)
def more_info(self):
"""Human-readable string to help understand the module"""
return (
"Configures Apache to authenticate and install HTTPS.{0}"
"Server root: {root}{0}"
"Version: {version}".format(
os.linesep, root=self.parser.loc["root"],
version=".".join(str(i) for i in self.version))
)
###########################################################################
# Challenges Section
@@ -965,7 +989,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
:rtype: list
"""
self.chall_out += len(chall_list)
self._chall_out += len(chall_list)
responses = [None] * len(chall_list)
apache_dvsni = dvsni.ApacheDvsni(self)
@@ -991,10 +1015,10 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
def cleanup(self, chall_list):
"""Revert all challenges."""
self.chall_out -= len(chall_list)
self._chall_out -= len(chall_list)
# If all of the challenges have been finished, clean up everything
if self.chall_out <= 0:
if self._chall_out <= 0:
self.revert_challenge_config()
self.restart()

View File

@@ -233,7 +233,6 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
if client_list:
self.client_auth.cleanup(client_list)
def _cleanup_state(self, delete_list):
"""Cleanup state after an authorization is received.

View File

@@ -67,7 +67,7 @@ def _dvsni_gen_ext(dvsni_r, dvsni_s):
:rtype: str
"""
z_base = hashlib.new('sha256')
z_base = hashlib.new("sha256")
z_base.update(dvsni_r)
z_base.update(dvsni_s)

View File

@@ -1,13 +1,10 @@
"""ACME protocol client class and helper functions."""
import csv
import logging
import os
import shutil
import sys
import Crypto.PublicKey.RSA
import M2Crypto
import zope.component
from letsencrypt.acme import messages
from letsencrypt.acme import util as acme_util
@@ -16,13 +13,14 @@ from letsencrypt.client import auth_handler
from letsencrypt.client import client_authenticator
from letsencrypt.client import crypto_util
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import le_util
from letsencrypt.client import network
from letsencrypt.client import reverter
from letsencrypt.client import revoker
from letsencrypt.client.apache import configurator
from letsencrypt.client.display import ops as display_ops
from letsencrypt.client.display import enhancements
class Client(object):
@@ -50,15 +48,15 @@ class Client(object):
"""Initialize a client.
:param dv_auth: IAuthenticator that can solve the
:const:`letsencrypt.client.constants.DV_CHALLENGES`
:const:`letsencrypt.client.constants.DV_CHALLENGES`.
The :meth:`~letsencrypt.client.interfaces.IAuthenticator.prepare`
must have already been run.
:type dv_auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
"""
self.network = network.Network(config.server)
self.authkey = authkey
self.installer = installer
self.config = config
if dv_auth is not None:
@@ -104,7 +102,8 @@ class Client(object):
cert_file, chain_file = self.save_certificate(
certificate_msg, self.config.cert_path, self.config.chain_path)
self.store_cert_key(cert_file, False)
revoker.Revoker.store_cert_key(
cert_file, self.authkey.file, self.config)
return cert_file, chain_file
@@ -201,8 +200,7 @@ class Client(object):
# sites may have been enabled / final cleanup
self.installer.restart()
zope.component.getUtility(
interfaces.IDisplay).success_installation(domains)
display_ops.success_installation(domains)
def enhance_config(self, domains, redirect=None):
"""Enhance the configuration.
@@ -216,7 +214,7 @@ class Client(object):
:param redirect: If traffic should be forwarded from HTTP to HTTPS.
:type redirect: bool or None
:raises :class:`letsencrypt.client.errors.LetsEncryptClientError`: if
:raises letsencrypt.client.errors.LetsEncryptClientError: if
no installer is specified in the client.
"""
@@ -226,58 +224,11 @@ class Client(object):
raise errors.LetsEncryptClientError("No installer available")
if redirect is None:
redirect = zope.component.getUtility(
interfaces.IDisplay).redirect_by_default()
redirect = enhancements.ask("redirect")
if redirect:
self.redirect_to_ssl(domains)
def store_cert_key(self, cert_file, encrypt=False):
"""Store certificate key. (Used to allow quick revocation)
:param str cert_file: Path to a certificate file.
:param bool encrypt: Should the certificate key be encrypted?
:returns: True if key file was stored successfully, False otherwise.
:rtype: bool
"""
list_file = os.path.join(self.config.cert_key_backup, "LIST")
le_util.make_or_verify_dir(self.config.cert_key_backup, 0o700)
idx = 0
if encrypt:
logging.error(
"Unfortunately securely storing the certificates/"
"keys is not yet available. Stay tuned for the "
"next update!")
return False
if os.path.isfile(list_file):
with open(list_file, 'r+b') as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
idx = int(row[0]) + 1
csvwriter = csv.writer(csvfile)
csvwriter.writerow([str(idx), cert_file, self.authkey.file])
else:
with open(list_file, 'wb') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(["0", cert_file, self.authkey.file])
shutil.copy2(self.authkey.file,
os.path.join(
self.config.cert_key_backup,
os.path.basename(self.authkey.file) + "_" + str(idx)))
shutil.copy2(cert_file,
os.path.join(
self.config.cert_key_backup,
os.path.basename(cert_file) + "_" + str(idx)))
return True
def redirect_to_ssl(self, domains):
"""Redirect all traffic from HTTP to HTTPS
@@ -289,7 +240,7 @@ class Client(object):
try:
self.installer.enhance(dom, "redirect")
except errors.LetsEncryptConfiguratorError:
logging.warn('Unable to perform redirect for %s', dom)
logging.warn("Unable to perform redirect for %s", dom)
self.installer.save("Add Redirects")
self.installer.restart()
@@ -310,7 +261,8 @@ def validate_key_csr(privkey, csr=None):
:param csr: CSR
:type csr: :class:`letsencrypt.client.le_util.CSR`
:raises LetsEncryptClientError: if validation fails
:raises letsencrypt.client.errors.LetsEncryptClientError: when
validation fails
"""
# TODO: Handle all of these problems appropriately
@@ -373,6 +325,11 @@ def init_key(key_size, key_dir):
def init_csr(privkey, names, cert_dir):
"""Initialize a CSR with the given private key.
:param privkey: Key to include in the CSR
:type privkey: :class:`letsencrypt.client.le_util.Key`
:param list names: `str` names to include in the CSR
:param str cert_dir: Certificate save directory.
"""
@@ -389,18 +346,48 @@ def init_csr(privkey, names, cert_dir):
return le_util.CSR(csr_filename, csr_der, "der")
# This should be controlled by commandline parameters
def determine_authenticator(config):
def determine_authenticator(all_auths):
"""Returns a valid IAuthenticator.
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
:param list all_auths: Where each is a
:class:`letsencrypt.client.interfaces.IAuthenticator` object
:returns: Valid Authenticator object or None
:raises letsencrypt.client.errors.LetsEncryptClientError: If no
authenticator is available.
"""
try:
return configurator.ApacheConfigurator(config)
except errors.LetsEncryptNoInstallationError:
logging.info("Unable to determine a way to authenticate the server")
# Available Authenticator objects
avail_auths = []
# Error messages for misconfigured authenticators
errs = {}
for pot_auth in all_auths:
try:
pot_auth.prepare()
except errors.LetsEncryptMisconfigurationError as err:
errs[pot_auth] = err
except errors.LetsEncryptNoInstallationError:
continue
avail_auths.append(pot_auth)
if len(avail_auths) > 1:
auth = display_ops.choose_authenticator(avail_auths, errs)
elif len(avail_auths) == 1:
auth = avail_auths[0]
else:
raise errors.LetsEncryptClientError("No Authenticators available.")
if auth is not None and auth in errs:
logging.error("Please fix the configuration for the Authenticator. "
"The following error message was received: "
"%s", errs[auth])
return
return auth
def determine_installer(config):
@@ -409,28 +396,25 @@ def determine_installer(config):
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
:returns: IInstaller or `None`
:rtype: :class:`~letsencrypt.client.interfaces.IInstaller` or `None`
"""
installer = configurator.ApacheConfigurator(config)
try:
return configurator.ApacheConfigurator(config)
installer.prepare()
return installer
except errors.LetsEncryptNoInstallationError:
logging.info("Unable to find a way to install the certificate.")
return
except errors.LetsEncryptMisconfigurationError:
# This will have to be changed in the future...
return installer
def rollback(checkpoints, config):
"""Revert configuration the specified number of checkpoints.
.. note:: If another installer uses something other than the reverter class
to do their configuration changes, the correct reverter will have to be
determined.
.. note:: This function restarts the server even if there weren't any
rollbacks. The user may be confused or made an error and simply needs
to restart the server.
.. todo:: This function will have to change depending on the functionality
of future installers. Perhaps the interface should define errors that
are thrown for the various functions.
:param int checkpoints: Number of checkpoints to revert.
:param config: Configuration.
@@ -438,11 +422,7 @@ def rollback(checkpoints, config):
"""
# Misconfigurations are only a slight problems... allow the user to rollback
try:
installer = determine_installer(config)
except errors.LetsEncryptMisconfigurationError:
_misconfigured_rollback(checkpoints, config)
return
installer = determine_installer(config)
# No Errors occurred during init... proceed normally
# If installer is None... couldn't find an installer... there shouldn't be
@@ -452,44 +432,7 @@ def rollback(checkpoints, config):
installer.restart()
def _misconfigured_rollback(checkpoints, config):
"""Handles the case where the Installer is misconfigured.
:param int checkpoints: Number of checkpoints to revert.
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
"""
yes = zope.component.getUtility(interfaces.IDisplay).generic_yesno(
"Oh, no! The web server is currently misconfigured.{0}{0}"
"Would you still like to rollback the "
"configuration?".format(os.linesep))
if not yes:
logging.info("The error message is above.")
logging.info("Configuration was not rolled back.")
return
logging.info("Rolling back using the Reverter module")
# recovery routine has probably already been run by installer
# in the__init__ attempt, run it again for safety... it shouldn't hurt
# Also... not sure how future installers will handle recovery.
rev = reverter.Reverter(config)
rev.recovery_routine()
rev.rollback_checkpoints(checkpoints)
# We should try to restart the server
try:
installer = determine_installer(config)
installer.restart()
logging.info("Hooray! Rollback solved the misconfiguration!")
logging.info("Your web server is back up and running.")
except errors.LetsEncryptMisconfigurationError:
logging.warning(
"Rollback was unable to solve the misconfiguration issues")
def revoke(config):
def revoke(config, no_confirm, cert, authkey):
"""Revoke certificates.
:param config: Configuration.
@@ -498,25 +441,18 @@ def revoke(config):
"""
# Misconfigurations don't really matter. Determine installer better choose
# correctly though.
try:
installer = determine_installer(config)
except errors.LetsEncryptMisconfigurationError:
zope.component.getUtility(interfaces.IDisplay).generic_notification(
"The web server is currently misconfigured. Some "
"abilities like seeing which certificates are currently "
"installed may not be available.")
installer = None
# This will need some better prepared or properly configured parameter...
# I will figure it out later...
installer = determine_installer(config)
# This is a temporary fix to avoid errors. The Revoker is not fully
# developed.
if installer is None:
zope.component.getUtility(interfaces.IDisplay).generic_notification(
"The Let's Encrypt Revoker module does not currently support "
"revocation without a valid installer. This feature should come "
"soon.")
return
revoc = revoker.Revoker(installer, config)
revoc.list_certs_keys()
revoc = revoker.Revoker(installer, config, no_confirm)
# Cert is most selective, so it is chosen first.
if cert is not None:
revoc.revoke_from_cert(cert[0])
elif authkey is not None:
revoc.revoke_from_key(le_util.Key(authkey[0], authkey[1]))
else:
revoc.revoke_from_menu()
def view_config_changes(config):

View File

@@ -45,8 +45,10 @@ class NamespaceConfig(object):
@property
def cert_key_backup(self): # pylint: disable=missing-docstring
return os.path.join(
self.namespace.work_dir, constants.CERT_KEY_BACKUP_DIR)
self.namespace.work_dir, constants.CERT_KEY_BACKUP_DIR,
self.namespace.server.partition(":")[0])
# TODO: This should probably include the server name
@property
def rec_token_dir(self): # pylint: disable=missing-docstring
return os.path.join(self.namespace.work_dir, constants.REC_TOKEN_DIR)

View File

@@ -1,4 +1,9 @@
"""Let's Encrypt client crypto utility functions"""
"""Let's Encrypt client crypto utility functions
.. todo:: Make the transition to use PSS rather than PKCS1_v1_5 when the server
is capable of handling the signatures.
"""
import time
import Crypto.Hash.SHA256
@@ -39,7 +44,7 @@ def make_csr(key_str, domains):
extstack.push(ext)
csr.add_extensions(extstack)
csr.sign(pubkey, 'sha256')
csr.sign(pubkey, "sha256")
assert csr.verify(pubkey)
pubkey2 = csr.get_pubkey()
assert csr.verify(pubkey2)
@@ -93,7 +98,7 @@ def make_key(bits):
:rtype: str
"""
return Crypto.PublicKey.RSA.generate(bits).exportKey(format='PEM')
return Crypto.PublicKey.RSA.generate(bits).exportKey(format="PEM")
def valid_privkey(privkey):
@@ -147,45 +152,12 @@ def make_ss_cert(key_str, domains, not_before=None,
if len(domains) > 1:
cert.add_ext(M2Crypto.X509.new_extension(
'basicConstraints', 'CA:FALSE'))
# cert.add_ext(M2Crypto.X509.new_extension(
# 'extendedKeyUsage', 'TLS Web Server Authentication'))
"basicConstraints", "CA:FALSE"))
cert.add_ext(M2Crypto.X509.new_extension(
'subjectAltName', ", ".join(["DNS:%s" % d for d in domains])))
"subjectAltName", ", ".join(["DNS:%s" % d for d in domains])))
cert.sign(pubkey, 'sha256')
cert.sign(pubkey, "sha256")
assert cert.verify(pubkey)
assert cert.verify()
# print check_purpose(,0
return cert.as_pem()
def get_cert_info(filename):
"""Get certificate info.
.. todo:: Pub key is assumed to be RSA... find a good solution to allow EC.
:param str filename: Name of file containing certificate in PEM format.
:rtype: dict
"""
# M2Crypto Library only supports RSA right now
cert = M2Crypto.X509.load_cert(filename)
try:
san = cert.get_ext("subjectAltName").get_value()
except LookupError:
san = ""
return {
"not_before": cert.get_not_before().get_datetime(),
"not_after": cert.get_not_after().get_datetime(),
"subject": cert.get_subject().as_text(),
"cn": cert.get_subject().CN,
"issuer": cert.get_issuer().as_text(),
"fingerprint": cert.get_fingerprint(md='sha1'),
"san": san,
"serial": cert.get_serial_number(),
"pub_key": "RSA " + str(cert.get_pubkey().size() * 8),
}

View File

@@ -1,446 +0,0 @@
"""Lets Encrypt display."""
import os
import textwrap
import dialog
import zope.interface
from letsencrypt.client import interfaces
WIDTH = 72
HEIGHT = 20
class CommonDisplayMixin(object): # pylint: disable=too-few-public-methods
"""Mixin with methods common to classes implementing IDisplay."""
def redirect_by_default(self):
"""Determines whether the user would like to redirect to HTTPS.
:returns: True if redirect is desired, False otherwise
:rtype: bool
"""
choices = [
("Easy", "Allow both HTTP and HTTPS access to these sites"),
("Secure", "Make all requests redirect to secure HTTPS access")]
result = self.generic_menu(
"Please choose whether HTTPS access is required or optional.",
choices, "Please enter the appropriate number")
if result[0] != OK:
return False
# different answer for each type of display
return str(result[1]) == "Secure" or result[1] == 1
class NcursesDisplay(CommonDisplayMixin):
"""Ncurses-based display."""
zope.interface.implements(interfaces.IDisplay)
def __init__(self, width=WIDTH, height=HEIGHT):
super(NcursesDisplay, self).__init__()
self.dialog = dialog.Dialog()
self.width = width
self.height = height
def generic_notification(self, message, height=10):
"""Display a notification to the user and wait for user acceptance.
:param str message: Message to display
:param int height: Height of the dialog box
"""
self.dialog.msgbox(message, height, width=self.width)
def generic_menu(self, message, choices, unused_input_text=""):
"""Display a menu.
:param str message: title of menu
:param choices: menu lines
:type choices: list of tuples (tag, item) or
list of items (tags will be enumerated)
:returns: tuple of the form (code, tag) where
code is a display exit code
tag is the tag string corresponding to the item chosen
:rtype: tuple
"""
# Can accept either tuples or just the actual choices
if choices and isinstance(choices[0], tuple):
code, selection = self.dialog.menu(
message, choices=choices, width=self.width, height=self.height)
return code, str(selection)
else:
choices = list(enumerate(choices, 1))
code, tag = self.dialog.menu(
message, choices=choices, width=self.width, height=self.height)
return code, int(tag) - 1
def generic_input(self, message):
"""Display an input box to the user.
:param str message: Message to display that asks for input.
:returns: tuple of the form (code, string) where
code is a display exit code
string is the input entered by the user
"""
return self.dialog.inputbox(message)
def generic_yesno(self, message, yes_label="Yes", no_label="No"):
"""Display a Yes/No dialog box
:param str message: message to display to user
:param str yes_label: label on the 'yes' button
:param str no_label: label on the 'no' button
:returns: if yes_label was selected
:rtype: bool
"""
return self.dialog.DIALOG_OK == self.dialog.yesno(
message, self.height, self.width,
yes_label=yes_label, no_label=no_label)
def filter_names(self, names):
"""Determine which names the user would like to select from a list.
:param list names: domain names
:returns: tuple of the form (code, names) where
code is a display exit code
names is a list of names selected
:rtype: tuple
"""
choices = [(n, "", 0) for n in names]
code, names = self.dialog.checklist(
"Which names would you like to activate HTTPS for?",
choices=choices)
return code, [str(s) for s in names]
def success_installation(self, domains):
"""Display a box confirming the installation of HTTPS.
:param list domains: domain names which were enabled
"""
self.dialog.msgbox(
"\nCongratulations! You have successfully enabled "
+ gen_https_names(domains) + "!", width=self.width)
def display_certs(self, certs):
"""Display certificates for revocation.
:param list certs: `list` of `dict` used throughout revoker.py
:returns: tuple of the form (code, selection) where
code is a display exit code
selection is the user's int selection
:rtype: tuple
"""
list_choices = [
(str(i+1), "%s | %s | %s" %
(str(c["cn"].ljust(self.width - 39)),
c["not_before"].strftime("%m-%d-%y"),
"Installed" if c["installed"] else ""))
for i, c in enumerate(certs)]
code, tag = self.dialog.menu(
"Which certificates would you like to revoke?",
choices=list_choices, help_button=True,
help_label="More Info", ok_label="Revoke",
width=self.width, height=self.height)
if not tag:
tag = -1
return code, (int(tag) - 1)
def confirm_revocation(self, cert):
"""Confirm revocation screen.
:param dict cert: cert dict used throughout revoker.py
:returns: True if user would like to revoke, False otherwise
:rtype: bool
"""
text = ("Are you sure you would like to revoke the following "
"certificate:\n")
text += cert_info_frame(cert)
text += "This action cannot be reversed!"
return self.dialog.DIALOG_OK == self.dialog.yesno(
text, width=self.width, height=self.height)
def more_info_cert(self, cert):
"""Displays more information about the certificate.
:param dict cert: cert dict used throughout revoker.py
"""
text = "Certificate Information:\n"
text += cert_info_frame(cert)
self.dialog.msgbox(text, width=self.width, height=self.height)
class FileDisplay(CommonDisplayMixin):
"""File-based display."""
zope.interface.implements(interfaces.IDisplay)
def __init__(self, outfile):
super(FileDisplay, self).__init__()
self.outfile = outfile
def generic_notification(self, message, unused_height):
"""Displays a notification and waits for user acceptance.
:param str message: Message to display
"""
side_frame = '-' * 79
lines = message.splitlines()
fixed_l = []
for line in lines:
fixed_l.append(textwrap.fill(line, 80))
self.outfile.write(
"{0}{1}{0}{2}{0}{1}{0}".format(
os.linesep, side_frame, os.linesep.join(fixed_l)))
raw_input("Press Enter to Continue")
def generic_menu(self, message, choices, input_text=""):
"""Display a menu.
:param str message: title of menu
:param choices: Menu lines
:type choices: list of tuples (tag, item) or
list of items (tags will be enumerated)
:returns: tuple of the form (code, tag) where
code is a display exit code
tag is the tag string corresponding to the item chosen
:rtype: tuple
"""
# Can take either tuples or single items in choices list
if choices and isinstance(choices[0], tuple):
choices = ["%s - %s" % (c[0], c[1]) for c in choices]
self.outfile.write("\n%s\n" % message)
side_frame = '-' * 79
self.outfile.write("%s\n" % side_frame)
for i, choice in enumerate(choices, 1):
self.outfile.write(textwrap.fill(
"%d: %s" % (i, choice), 80) + '\n')
self.outfile.write("%s\n" % side_frame)
code, selection = self._get_valid_int_ans(
"%s (c to cancel): " % input_text)
return code, (selection - 1)
def generic_input(self, message):
# pylint: disable=no-self-use
"""Accept input from the user
:param str message: message to display to the user
:returns: tuple of (code, input) where
code is a display exit code
input is a str of the user's input
:rtype: tuple
"""
ans = raw_input("%s (Enter c to cancel)\n" % message)
if ans == 'c' or ans == 'C':
return CANCEL, "-1"
else:
return OK, ans
def generic_yesno(self, message, unused_yes_label="", unused_no_label=""):
"""Query the user with a yes/no question.
:param str message: question for the user
:returns: True for 'Yes', False for 'No"
:rtype: bool
"""
self.outfile.write("\n%s\n" % textwrap.fill(message, 80))
ans = raw_input("y/n: ")
return ans.startswith('y') or ans.startswith('Y')
def filter_names(self, names):
"""Determine which names the user would like to select from a list.
:param list names: domain names
:returns: tuple of the form (code, names) where
code is a display exit code
names is a list of names selected
:rtype: tuple
"""
code, tag = self.generic_menu(
"Choose the names would you like to upgrade to HTTPS?",
names, "Select the number of the name: ")
# Make sure to return a list...
return code, [names[tag]]
def success_installation(self, domains):
"""Display a box confirming the installation of HTTPS.
:param list domains: domain names which were enabled
"""
side_frame = '*' * 79
msg = textwrap.fill("Congratulations! You have successfully "
"enabled %s!" % gen_https_names(domains))
self.outfile.write("%s\n%s\n%s\n" % (side_frame, msg, side_frame))
def display_certs(self, certs):
"""Display certificates for revocation.
:param list certs: `list` of `dict` used throughout revoker.py
:returns: tuple of the form (code, selection) where
code is a display exit code
selection is the user's int selection
:rtype: tuple
"""
menu_choices = [(str(i+1), str(c["cn"]) + " - " + c["pub_key"] +
" - " + str(c["not_before"])[:-6])
for i, c in enumerate(certs)]
self.outfile.write("Which certificate would you like to revoke?\n")
for choice in menu_choices:
self.outfile.write(textwrap.fill(
"%s: %s - %s Signed (UTC): %s\n" % choice[:4]))
return self._get_valid_int_ans("Revoke Number (c to cancel): ") - 1
def _get_valid_int_ans(self, input_string):
"""Get a numerical selection.
:param str input_string: Instructions for the user to make a selection.
:returns: tuple of the form (code, selection) where
code is a display exit code
selection is the user's int selection
:rtype: tuple
"""
valid_ans = False
e_msg = "Please input a number or the letter c to cancel\n"
while not valid_ans:
ans = raw_input(input_string)
if ans.startswith('c') or ans.startswith('C'):
code = CANCEL
selection = -1
valid_ans = True
else:
try:
selection = int(ans)
# TODO add check to make sure it is less than max
if selection < 0:
self.outfile.write(e_msg)
continue
code = OK
valid_ans = True
except ValueError:
self.outfile.write(e_msg)
return code, selection
def confirm_revocation(self, cert):
"""Confirm revocation screen.
:param dict cert: cert dict used throughout revoker.py
:returns: True if user would like to revoke, False otherwise
:rtype: bool
"""
self.outfile.write("Are you sure you would like to revoke "
"the following certificate:\n")
self.outfile.write(cert_info_frame(cert))
self.outfile("This action cannot be reversed!\n")
ans = raw_input("y/n")
return ans.startswith('y') or ans.startswith('Y')
def more_info_cert(self, cert):
"""Displays more info about the cert.
:param dict cert: cert dict used throughout revoker.py
"""
self.outfile.write("\nCertificate Information:\n")
self.outfile.write(cert_info_frame(cert))
# Display exit codes
OK = "ok"
"""Display exit code indicating user acceptance"""
CANCEL = "cancel"
"""Display exit code for a user canceling the display"""
HELP = "help"
"""Display exit code when for when the user requests more help."""
def cert_info_frame(cert):
"""Nicely frames a cert dict used in revoker.py"""
text = "-" * (WIDTH - 4) + os.linesep
text += cert_info_string(cert)
text += "-" * (WIDTH - 4)
return text
def cert_info_string(cert):
"""Turn a cert dict into a string."""
text = []
text.append("Subject: %s" % cert["subject"])
text.append("SAN: %s" % cert["san"])
text.append("Issuer: %s" % cert["issuer"])
text.append("Public Key: %s" % cert["pub_key"])
text.append("Not Before: %s" % str(cert["not_before"]))
text.append("Not After: %s" % str(cert["not_after"]))
text.append("Serial Number: %s" % cert["serial"])
text.append("SHA1: %s" % cert["fingerprint"])
text.append("Installed: %s" % cert["installed"])
return os.linesep.join(text)
def gen_https_names(domains):
"""Returns a string of the https domains.
Domains are formatted nicely with https:// prepended to each.
.. todo:: This should not use +=, rewrite this with unittests
"""
result = ""
if len(domains) > 2:
for i in range(len(domains)-1):
result = result + "https://" + domains[i] + ", "
result = result + "and "
if len(domains) == 2:
return "https://" + domains[0] + " and https://" + domains[1]
if domains:
result = result + "https://" + domains[len(domains)-1]
return result

View File

@@ -0,0 +1 @@
"""Let's Encrypt client.display"""

View File

@@ -0,0 +1,60 @@
"""Let's Encrypt Enhancement Display"""
import logging
import zope.component
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client.display import util as display_util
# Define a helper function to avoid verbose code
util = zope.component.getUtility # pylint: disable=invalid-name
def ask(enhancement):
"""Display the enhancement to the user.
:param str enhancement: One of the
:class:`letsencrypt.client.CONFIG.ENHANCEMENTS` enhancements
:returns: True if feature is desired, False otherwise
:rtype: bool
:raises letsencrypt.client.errors.LetsEncryptClientError: If
the enhancement provided is not supported.
"""
try:
# Call the appropriate function based on the enhancement
return DISPATCH[enhancement]()
except KeyError:
logging.error("Unsupported enhancement given to ask(): %s", enhancement)
raise errors.LetsEncryptClientError("Unsupported Enhancement")
def redirect_by_default():
"""Determines whether the user would like to redirect to HTTPS.
:returns: True if redirect is desired, False otherwise
:rtype: bool
"""
choices = [
("Easy", "Allow both HTTP and HTTPS access to these sites"),
("Secure", "Make all requests redirect to secure HTTPS access"),
]
code, selection = util(interfaces.IDisplay).menu(
"Please choose whether HTTPS access is required or optional.",
choices)
if code != display_util.OK:
return False
return selection == 1
DISPATCH = {
"redirect": redirect_by_default
}

View File

@@ -0,0 +1,140 @@
"""Contains UI methods for LE user operations."""
import os
import zope.component
from letsencrypt.client import interfaces
from letsencrypt.client.display import util as display_util
# Define a helper function to avoid verbose code
util = zope.component.getUtility # pylint: disable=invalid-name
def choose_authenticator(auths, errs):
"""Allow the user to choose their authenticator.
:param list auths: Where each of type
:class:`letsencrypt.client.interfaces.IAuthenticator` object
:param dict errs: Mapping IAuthenticator objects to error messages
:returns: Authenticator selected
:rtype: :class:`letsencrypt.client.interfaces.IAuthenticator` or `None`
"""
descs = [auth.description if auth not in errs
else "%s (Misconfigured)" % auth.description
for auth in auths]
while True:
code, index = util(interfaces.IDisplay).menu(
"How would you like to authenticate with the Let's Encrypt CA?",
descs, help_label="More Info")
if code == display_util.OK:
return auths[index]
elif code == display_util.HELP:
if auths[index] in errs:
msg = "Reported Error: %s" % errs[auths[index]]
else:
msg = auths[index].more_info()
util(interfaces.IDisplay).notification(
msg, height=display_util.HEIGHT)
else:
return
def choose_names(installer):
"""Display screen to select domains to validate.
:param installer: An installer object
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
:returns: List of selected names
:rtype: `list` of `str`
"""
if installer is None:
return _choose_names_manually()
names = list(installer.get_all_names())
if not names:
manual = util(interfaces.IDisplay).yesno(
"No names were found in your configuration files.{0}You should "
"specify ServerNames in your config files in order to allow for "
"accurate installation of your certificate.{0}"
"If you do use the default vhost, you may specify the name "
"manually. Would you like to continue?{0}".format(os.linesep))
if manual:
return _choose_names_manually()
else:
return []
code, names = _filter_names(names)
if code == display_util.OK and names:
return names
else:
return []
def _filter_names(names):
"""Determine which names the user would like to select from a list.
:param list names: domain names
:returns: tuple of the form (`code`, `names`) where
`code` - str display exit code
`names` - list of names selected
:rtype: tuple
"""
code, names = util(interfaces.IDisplay).checklist(
"Which names would you like to activate HTTPS for?",
tags=names)
return code, [str(s) for s in names]
def _choose_names_manually():
"""Manually input names for those without an installer."""
code, input_ = util(interfaces.IDisplay).input(
"Please enter in your domain name(s) (comma and/or space separated) ")
if code == display_util.OK:
return display_util.separate_list_input(input_)
return []
def success_installation(domains):
"""Display a box confirming the installation of HTTPS.
.. todo:: This should be centered on the screen
:param list domains: domain names which were enabled
"""
util(interfaces.IDisplay).notification(
"Congratulations! You have successfully enabled "
"%s!" % _gen_https_names(domains), pause=False)
def _gen_https_names(domains):
"""Returns a string of the https domains.
Domains are formatted nicely with https:// prepended to each.
:param list domains: Each domain is a 'str'
"""
if len(domains) == 1:
return "https://{0}".format(domains[0])
elif len(domains) == 2:
return "https://{dom[0]} and https://{dom[1]}".format(dom=domains)
elif len(domains) > 2:
return "{0}{1}{2}".format(
", ".join("https://%s" % dom for dom in domains[:-1]),
", and https://",
domains[-1])
return ""

View File

@@ -0,0 +1,77 @@
"""Revocation UI class."""
import os
import zope.component
from letsencrypt.client import interfaces
from letsencrypt.client.display import util as display_util
# Define a helper function to avoid verbose code
util = zope.component.getUtility # pylint: disable=invalid-name
def display_certs(certs):
"""Display the certificates in a menu for revocation.
:param list certs: each is a :class:`letsencrypt.client.revoker.Cert`
:returns: tuple of the form (code, selection) where
code is a display exit code
selection is the user's int selection
:rtype: tuple
"""
list_choices = [
"%s | %s | %s" % (
str(cert.get_cn().ljust(display_util.WIDTH - 39)),
cert.get_not_before().strftime("%m-%d-%y"),
"Installed" if cert.installed and cert.installed != ["Unknown"]
else "") for cert in certs
]
code, tag = util(interfaces.IDisplay).menu(
"Which certificates would you like to revoke?",
list_choices, help_label="More Info", ok_label="Revoke",
cancel_label="Exit")
return code, tag
def confirm_revocation(cert):
"""Confirm revocation screen.
:param cert: certificate object
:type cert: :class:
:returns: True if user would like to revoke, False otherwise
:rtype: bool
"""
return util(interfaces.IDisplay).yesno(
"Are you sure you would like to revoke the following "
"certificate:{0}{cert}This action cannot be reversed!".format(
os.linesep, cert=cert.pretty_print()))
def more_info_cert(cert):
"""Displays more info about the cert.
:param dict cert: cert dict used throughout revoker.py
"""
util(interfaces.IDisplay).notification(
"Certificate Information:{0}{1}".format(
os.linesep, cert.pretty_print()),
height=display_util.HEIGHT)
def success_revocation(cert):
"""Display a success message.
:param cert: cert that was revoked
:type cert: :class:`letsencrypt.client.revoker.Cert`
"""
util(interfaces.IDisplay).notification(
"You have successfully revoked the certificate for "
"%s" % cert.get_cn())

View File

@@ -0,0 +1,417 @@
"""Let's Encrypt display."""
import os
import textwrap
import dialog
import zope.interface
from letsencrypt.client import interfaces
WIDTH = 72
HEIGHT = 20
# Display exit codes
OK = "ok"
"""Display exit code indicating user acceptance."""
CANCEL = "cancel"
"""Display exit code for a user canceling the display."""
HELP = "help"
"""Display exit code when for when the user requests more help."""
class NcursesDisplay(object):
"""Ncurses-based display."""
zope.interface.implements(interfaces.IDisplay)
def __init__(self, width=WIDTH, height=HEIGHT):
super(NcursesDisplay, self).__init__()
self.dialog = dialog.Dialog()
self.width = width
self.height = height
def notification(self, message, height=10, pause=False):
# pylint: disable=unused-argument
"""Display a notification to the user and wait for user acceptance.
.. todo:: It probably makes sense to use one of the transient message
types for pause. It isn't straightforward how best to approach
the matter though given the context of our messages.
http://pythondialog.sourceforge.net/doc/widgets.html#displaying-transient-messages
:param str message: Message to display
:param int height: Height of the dialog box
:param bool pause: Not applicable to NcursesDisplay
"""
self.dialog.msgbox(message, height, width=self.width)
def menu(self, message, choices,
ok_label="OK", cancel_label="Cancel", help_label=""):
"""Display a menu.
:param str message: title of menu
:param choices: menu lines, len must be > 0
:type choices: list of tuples (`tag`, `item`) tags must be unique or
list of items (tags will be enumerated)
:param str ok_label: label of the OK button
:param str help_label: label of the help button
:returns: tuple of the form (`code`, `tag`) where
`code` - `str` display_util exit code
`tag` - `int` index corresponding to the item chosen
:rtype: tuple
"""
menu_options = {
"choices": choices,
"ok_label": ok_label,
"cancel_label": cancel_label,
"help_button": bool(help_label),
"help_label": help_label,
"width": self.width,
"height": self.height,
"menu_height": self.height-6,
}
# Can accept either tuples or just the actual choices
if choices and isinstance(choices[0], tuple):
# pylint: disable=star-args
code, selection = self.dialog.menu(message, **menu_options)
# Return the selection index
for i, choice in enumerate(choices):
if choice[0] == selection:
return code, i
return code, -1
else:
# "choices" is not formatted the way the dialog.menu expects...
menu_options["choices"] = [
(str(i), choice) for i, choice in enumerate(choices, 1)
]
# pylint: disable=star-args
code, tag = self.dialog.menu(message, **menu_options)
if code == CANCEL:
return code, -1
return code, int(tag) - 1
def input(self, message):
"""Display an input box to the user.
:param str message: Message to display that asks for input.
:returns: tuple of the form (code, string) where
`code` - int display exit code
`string` - input entered by the user
"""
return self.dialog.inputbox(message, width=self.width)
def yesno(self, message, yes_label="Yes", no_label="No"):
"""Display a Yes/No dialog box.
Yes and No label must begin with different letters.
:param str message: message to display to user
:param str yes_label: label on the "yes" button
:param str no_label: label on the "no" button
:returns: if yes_label was selected
:rtype: bool
"""
return self.dialog.DIALOG_OK == self.dialog.yesno(
message, self.height, self.width,
yes_label=yes_label, no_label=no_label)
def checklist(self, message, tags):
"""Displays a checklist.
:param message: Message to display before choices
:param list tags: where each is of type :class:`str`
len(tags) > 0
:returns: tuple of the form (code, list_tags) where
`code` - int display exit code
`list_tags` - list of str tags selected by the user
"""
choices = [(tag, "", False) for tag in tags]
return self.dialog.checklist(
message, width=self.width, height=self.height, choices=choices)
class FileDisplay(object):
"""File-based display."""
zope.interface.implements(interfaces.IDisplay)
def __init__(self, outfile):
super(FileDisplay, self).__init__()
self.outfile = outfile
def notification(self, message, height=10, pause=True):
# pylint: disable=unused-argument
"""Displays a notification and waits for user acceptance.
:param str message: Message to display
:param int height: No effect for FileDisplay
:param bool pause: Whether or not the program should pause for the
user's confirmation
"""
side_frame = "-" * 79
message = self._wrap_lines(message)
self.outfile.write(
"{line}{frame}{line}{msg}{line}{frame}{line}".format(
line=os.linesep, frame=side_frame, msg=message))
if pause:
raw_input("Press Enter to Continue")
def menu(self, message, choices,
ok_label="", cancel_label="", help_label=""):
# pylint: disable=unused-argument
"""Display a menu.
.. todo:: This doesn't enable the help label/button (I wasn't sold on
any interface I came up with for this). It would be a nice feature
:param str message: title of menu
:param choices: Menu lines, len must be > 0
:type choices: list of tuples (tag, item) or
list of descriptions (tags will be enumerated)
:returns: tuple of the form (code, tag) where
code - int display exit code
tag - str corresponding to the item chosen
:rtype: tuple
"""
self._print_menu(message, choices)
code, selection = self._get_valid_int_ans(len(choices))
return code, selection - 1
def input(self, message):
# pylint: disable=no-self-use
"""Accept input from the user.
:param str message: message to display to the user
:returns: tuple of (`code`, `input`) where
`code` - str display exit code
`input` - str of the user's input
:rtype: tuple
"""
ans = raw_input(
textwrap.fill("%s (Enter 'c' to cancel): " % message, 80))
if ans == "c" or ans == "C":
return CANCEL, "-1"
else:
return OK, ans
def yesno(self, message, yes_label="Yes", no_label="No"):
"""Query the user with a yes/no question.
Yes and No label must begin with different letters, and must contain at
least one letter each.
:param str message: question for the user
:param str yes_label: Label of the "Yes" parameter
:param str no_label: Label of the "No" parameter
:returns: True for "Yes", False for "No"
:rtype: bool
"""
side_frame = ("-" * 79) + os.linesep
message = self._wrap_lines(message)
self.outfile.write("{0}{frame}{msg}{0}{frame}".format(
os.linesep, frame=side_frame, msg=message))
while True:
ans = raw_input("{yes}/{no}: ".format(
yes=_parens_around_char(yes_label),
no=_parens_around_char(no_label)))
# Couldn't get pylint indentation right with elif
# elif doesn't matter in this situation
if (ans.startswith(yes_label[0].lower()) or
ans.startswith(yes_label[0].upper())):
return True
if (ans.startswith(no_label[0].lower()) or
ans.startswith(no_label[0].upper())):
return False
def checklist(self, message, tags):
"""Display a checklist.
:param str message: Message to display to user
:param list tags: `str` tags to select, len(tags) > 0
:returns: tuple of (`code`, `tags`) where
`code` - str display exit code
`tags` - list of selected tags
:rtype: tuple
"""
while True:
self._print_menu(message, tags)
code, ans = self.input("Select the appropriate numbers separated "
"by commas and/or spaces")
if code == OK:
indices = separate_list_input(ans)
selected_tags = self._scrub_checklist_input(indices, tags)
if selected_tags:
return code, selected_tags
else:
self.outfile.write(
"** Error - Invalid selection **%s" % os.linesep)
else:
return code, []
def _scrub_checklist_input(self, indices, tags):
# pylint: disable=no-self-use
"""Validate input and transform indices to appropriate tags.
:param list indices: input
:param list tags: Original tags of the checklist
:returns: valid tags the user selected
:rtype: :class:`list` of :class:`str`
"""
# They should all be of type int
try:
indices = [int(index) for index in indices]
except ValueError:
return []
# Remove duplicates
indices = list(set(indices))
# Check all input is within range
for index in indices:
if index < 1 or index > len(tags):
return []
# Transform indices to appropriate tags
return [tags[index-1] for index in indices]
def _print_menu(self, message, choices):
"""Print a menu on the screen.
:param str message: title of menu
:param choices: Menu lines
:type choices: list of tuples (tag, item) or
list of descriptions (tags will be enumerated)
"""
# Can take either tuples or single items in choices list
if choices and isinstance(choices[0], tuple):
choices = ["%s - %s" % (c[0], c[1]) for c in choices]
# Write out the message to the user
self.outfile.write(
"{new}{msg}{new}".format(new=os.linesep, msg=message))
side_frame = ("-" * 79) + os.linesep
self.outfile.write(side_frame)
# Write out the menu choices
for i, desc in enumerate(choices, 1):
self.outfile.write(
textwrap.fill("{num}: {desc}".format(num=i, desc=desc), 80))
# Keep this outside of the textwrap
self.outfile.write(os.linesep)
self.outfile.write(side_frame)
def _wrap_lines(self, msg): # pylint: disable=no-self-use
"""Format lines nicely to 80 chars.
:param str msg: Original message
:returns: Formatted message respecting newlines in message
:rtype: str
"""
lines = msg.splitlines()
fixed_l = []
for line in lines:
fixed_l.append(textwrap.fill(line, 80))
return os.linesep.join(fixed_l)
def _get_valid_int_ans(self, max_):
"""Get a numerical selection.
:param int max: The maximum entry (len of choices), must be positive
:returns: tuple of the form (`code`, `selection`) where
`code` - str display exit code ('ok' or cancel')
`selection` - int user's selection
:rtype: tuple
"""
selection = -1
if max_ > 1:
input_msg = ("Select the appropriate number "
"[1-{max_}] then [enter] (press 'c' to "
"cancel): ".format(max_=max_))
else:
input_msg = ("Press 1 [enter] to confirm the selection "
"(press 'c' to cancel): ")
while selection < 1:
ans = raw_input(input_msg)
if ans.startswith("c") or ans.startswith("C"):
return CANCEL, -1
try:
selection = int(ans)
if selection < 1 or selection > max_:
selection = -1
raise ValueError
except ValueError:
self.outfile.write(
"{0}** Invalid input **{0}".format(os.linesep))
return OK, selection
def separate_list_input(input_):
"""Separate a comma or space separated list.
:param str input_: input from the user
:returns: strings
:rtype: list
"""
no_commas = input_.replace(",", " ")
# Each string is naturally unicode, this causes problems with M2Crypto SANs
return [str(string) for string in no_commas.split()]
def _parens_around_char(label):
"""Place parens around first character of label.
:param str label: Must contain at least one character
"""
return "({first}){rest}".format(first=label[0], rest=label[1:])

View File

@@ -39,3 +39,6 @@ class LetsEncryptNoInstallationError(LetsEncryptConfiguratorError):
class LetsEncryptMisconfigurationError(LetsEncryptConfiguratorError):
"""Let's Encrypt Misconfiguration error."""
class LetsEncryptRevokerError(LetsEncryptClientError):
"""Let's Encrypt Revoker error."""

View File

@@ -13,6 +13,18 @@ class IAuthenticator(zope.interface.Interface):
"""
def prepare():
"""Prepare the authenticator.
Finish up any additional initialization.
:raises letsencrypt.client.errors.LetsEncryptMisconfigurationError:
when full initialization cannot be completed.
:raises letsencrypt.client.errors.LetsEncryptNoInstallationError:
when the necessary programs/files cannot be located.
"""
def get_chall_pref(domain):
"""Return list of challenge preferences.
@@ -30,9 +42,10 @@ class IAuthenticator(zope.interface.Interface):
:param list chall_list: List of namedtuple types defined in
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.).
- chall_list will never be empty
- chall_list will only contain types found within
:func:`get_chall_pref`
:func:`get_chall_pref`
:returns: ACME Challenge responses or if it cannot be completed then:
@@ -52,20 +65,27 @@ class IAuthenticator(zope.interface.Interface):
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.)
- Only challenges given previously in the perform function will be
found in chall_list.
found in chall_list.
- chall_list will never be empty
"""
def more_info():
"""Human-readable string to help the user.
Should describe the steps taken and any relevant info to help the user
decide which Authenticator to use.
"""
class IConfig(zope.interface.Interface):
"""Let's Encrypt user-supplied configuration.
.. warning:: The values stored in the configuration have not been
filtered, stripped or sanitized in any way!
filtered, stripped or sanitized.
"""
server = zope.interface.Attribute(
"CA hostname (and optionally :port). The server certificate must "
"be trusted in order to avoid further modifications to the client.")
@@ -111,6 +131,18 @@ class IInstaller(zope.interface.Interface):
"""
def prepare():
"""Prepare the installer.
Finish up any additional initialization.
:raises letsencrypt.client.errors.LetsEncryptMisconfigurationError`:
when full initialization cannot be completed.
:raises letsencrypt.errors.LetsEncryptNoInstallationError`:
when the necessary programs/files cannot be located.
"""
def get_all_names():
"""Returns all names that may be authenticated."""
@@ -190,48 +222,68 @@ class IInstaller(zope.interface.Interface):
class IDisplay(zope.interface.Interface):
"""Generic display."""
def generic_notification(message):
def notification(message, height, pause):
"""Displays a string message
:param str message: Message to display
:param int height: Height of dialog box if applicable
:param bool pause: Whether or not the application should pause for
confirmation (if available)
"""
def generic_menu(message, choices, input_text=""):
def menu(message, choices,
ok_label="OK", cancel_label="Cancel", help_label=""):
"""Displays a generic menu.
:param str message: message to display
:param choices: choices
:type choices: :class:`list` of :func:`tuple`
:type choices: :class:`list` of :func:`tuple` or :class:`str`
:param str input_text: instructions on how to make a selection
:param str ok_label: label for OK button
:param str cancel_label: label for Cancel button
:param str help_label: label for Help button
:returns: tuple of (`code`, `index`) where
`code` - str display exit code
`index` - int index of the user's selection
"""
def generic_input(message):
"""Accept input from the user."""
def input(message):
"""Accept input from the user.
def generic_yesno(message, yes_label="Yes", no_label="No"):
"""A yes/no dialog."""
:param str message: message to display to the user
def filter_names(names):
"""Allow the user to select which names they would like to activate."""
:returns: tuple of (`code`, `input`) where
`code` - str display exit code
`input` - str of the user's input
:rtype: tuple
def success_installation(domains):
"""Display a congratulations message for new https domains."""
"""
def display_certs(certs):
"""Display a list of certificates."""
def yesno(message, yes_label="Yes", no_label="No"):
"""Query the user with a yes/no question.
def confirm_revocation(cert):
"""Confirmation of revocation screen."""
Yes and No label must begin with different letters.
def more_info_cert(cert):
"""Print out all information for a given certificate dict."""
:param str message: question for the user
def redirect_by_default():
"""Ask the user whether they would like to redirect to HTTPS."""
:returns: True for "Yes", False for "No"
:rtype: bool
"""
def checklist(message, choices):
"""Allow for multiple selections from a menu.
:param str message: message to display to the user
:param tags: tags
:type tags: :class:`list` of :class:`str`
"""
class IValidator(zope.interface.Interface):

View File

@@ -11,6 +11,7 @@ Key = collections.namedtuple("Key", "file pem")
# Note: form is the type of data, "pem" or "der"
CSR = collections.namedtuple("CSR", "file data form")
def make_or_verify_dir(directory, mode=0o755, uid=0):
"""Make sure directory exists with proper permissions.
@@ -72,3 +73,12 @@ def unique_file(path, mode=0o777):
except OSError:
pass
count += 1
def safely_remove(path):
"""Remove a file that may not exist."""
try:
os.remove(path)
except OSError as err:
if err.errno != errno.ENOENT:
raise

View File

@@ -3,7 +3,7 @@ import logging
import dialog
from letsencrypt.client import display
from letsencrypt.client.display import util as display_util
class DialogHandler(logging.Handler): # pylint: disable=too-few-public-methods
@@ -19,8 +19,8 @@ class DialogHandler(logging.Handler): # pylint: disable=too-few-public-methods
PADDING_HEIGHT = 2
PADDING_WIDTH = 4
def __init__(self, level=logging.NOTSET, height=display.HEIGHT,
width=display.WIDTH - 4, d=None):
def __init__(self, level=logging.NOTSET, height=display_util.HEIGHT,
width=display_util.WIDTH - 4, d=None):
# Handler not new-style -> no super
logging.Handler.__init__(self, level)
self.height = height

View File

@@ -33,7 +33,7 @@ class RecoveryToken(object):
return self.generate_response(token_fd.read())
cancel, token = zope.component.getUtility(
interfaces.IDisplay).generic_input(
interfaces.IDisplay).input(
"%s - Input Recovery Token: " % chall.domain)
if cancel != 1:
return self.generate_response(token)
@@ -48,7 +48,7 @@ class RecoveryToken(object):
"""
try:
os.remove(os.path.join(self.token_dir, chall.domain))
le_util.safely_remove(os.path.join(self.token_dir, chall.domain))
except OSError as err:
if err.errno != errno.ENOENT:
raise
@@ -73,5 +73,5 @@ class RecoveryToken(object):
"""
le_util.make_or_verify_dir(self.token_dir, 0o700, os.geteuid())
with open(os.path.join(self.token_dir, domain), 'w') as token_fd:
with open(os.path.join(self.token_dir, domain), "w") as token_fd:
token_fd.write(str(token))

View File

@@ -6,10 +6,10 @@ import time
import zope.component
from letsencrypt.client import display
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import le_util
from letsencrypt.client.display import util as display_util
class Reverter(object):
@@ -28,8 +28,8 @@ class Reverter(object):
This function should reinstall the users original configuration files
for all saves with temporary=True
:raises :class:`errors.LetsEncryptReverterError`:
Unable to revert config
:raises letsencrypt.client.errors.LetsEncryptReverterError: when
unable to revert config
"""
if os.path.isdir(self.config.temp_checkpoint_dir):
@@ -46,9 +46,9 @@ class Reverter(object):
"""Revert 'rollback' number of configuration checkpoints.
:param int rollback: Number of checkpoints to reverse. A str num will be
cast to an integer. So '2' is also acceptable.
cast to an integer. So "2" is also acceptable.
:raises :class:`letsencrypt.client.errors.LetsEncryptReverterError`: If
:raises letsencrypt.client.errors.LetsEncryptReverterError: If
there is a problem with the input or if the function is unable to
correctly revert the configuration checkpoints.
@@ -126,8 +126,8 @@ class Reverter(object):
output.append(os.linesep)
zope.component.getUtility(interfaces.IDisplay).generic_notification(
os.linesep.join(output), display.HEIGHT)
zope.component.getUtility(interfaces.IDisplay).notification(
os.linesep.join(output), display_util.HEIGHT)
def add_to_temp_checkpoint(self, save_files, save_notes):
"""Add files to temporary checkpoint
@@ -159,7 +159,7 @@ class Reverter(object):
:param str save_notes: notes about changes made during the save
:raises IOError: If unable to open cp_dir + FILEPATHS file
:raises :class:`letsencrypt.client.errors.LetsEncryptReverterError: If
:raises letsencrypt.client.errors.LetsEncryptReverterError: If
unable to add checkpoint
"""
@@ -180,7 +180,7 @@ class Reverter(object):
try:
shutil.copy2(filename, os.path.join(
cp_dir, os.path.basename(filename) + "_" + str(idx)))
op_fd.write(filename + '\n')
op_fd.write(filename + os.linesep)
# http://stackoverflow.com/questions/4726260/effective-use-of-python-shutil-copy2
except IOError:
op_fd.close()
@@ -193,7 +193,7 @@ class Reverter(object):
idx += 1
op_fd.close()
with open(os.path.join(cp_dir, "CHANGES_SINCE"), 'a') as notes_fd:
with open(os.path.join(cp_dir, "CHANGES_SINCE"), "a") as notes_fd:
notes_fd.write(save_notes)
def _read_and_append(self, filepath): # pylint: disable=no-self-use
@@ -204,11 +204,11 @@ class Reverter(object):
"""
# Open up filepath differently depending on if it already exists
if os.path.isfile(filepath):
op_fd = open(filepath, 'r+')
op_fd = open(filepath, "r+")
lines = op_fd.read().splitlines()
else:
lines = []
op_fd = open(filepath, 'w')
op_fd = open(filepath, "w")
return op_fd, lines
@@ -230,7 +230,7 @@ class Reverter(object):
for idx, path in enumerate(filepaths):
shutil.copy2(os.path.join(
cp_dir,
os.path.basename(path) + '_' + str(idx)), path)
os.path.basename(path) + "_" + str(idx)), path)
except (IOError, OSError):
# This file is required in all checkpoints.
logging.error("Unable to recover files from %s", cp_dir)
@@ -252,7 +252,7 @@ class Reverter(object):
:param set save_files: Set of files about to be saved.
:raises :class:`letsencrypt.client.errors.LetsEncryptReverterError`:
:raises letsencrypt.client.errors.LetsEncryptReverterError:
when save is attempting to overwrite a temporary file.
"""
@@ -261,13 +261,13 @@ class Reverter(object):
# Get temp modified files
temp_path = os.path.join(self.config.temp_checkpoint_dir, "FILEPATHS")
if os.path.isfile(temp_path):
with open(temp_path, 'r') as protected_fd:
with open(temp_path, "r") as protected_fd:
protected_files.extend(protected_fd.read().splitlines())
# Get temp new files
new_path = os.path.join(self.config.temp_checkpoint_dir, "NEW_FILES")
if os.path.isfile(new_path):
with open(new_path, 'r') as protected_fd:
with open(new_path, "r") as protected_fd:
protected_files.extend(protected_fd.read().splitlines())
# Verify no save_file is in protected_files
@@ -288,7 +288,7 @@ class Reverter(object):
a temp or permanent save.
:param \*files: file paths (str) to be registered
:raises :class:`letsencrypt.client.errors.LetsEncryptReverterError`: If
:raises letsencrypt.client.errors.LetsEncryptReverterError: If
call does not contain necessary parameters or if the file creation
is unable to be registered.
@@ -354,7 +354,7 @@ class Reverter(object):
:returns: Success
:rtype: bool
:raises :class:`letsencrypt.client.errors.LetsEncryptReverterError`: If
:raises letsencrypt.client.errors.LetsEncryptReverterError: If
all files within file_list cannot be removed
"""
@@ -363,7 +363,7 @@ class Reverter(object):
if not os.path.isfile(file_list):
return False
try:
with open(file_list, 'r') as list_fd:
with open(file_list, "r") as list_fd:
filepaths = list_fd.read().splitlines()
for path in filepaths:
# Files are registered before they are added... so
@@ -393,23 +393,24 @@ class Reverter(object):
:param str title: Title describing checkpoint
:raises :class:`letsencrypt.client.errors.LetsEncryptReverterError`
:raises letsencrypt.client.errors.LetsEncryptReverterError: when the
checkpoint is not able to be finalized.
"""
# Check to make sure an "in progress" directory exists
if not os.path.isdir(self.config.in_progress_dir):
logging.warning("No IN_PROGRESS checkpoint to finalize")
return
changes_since_path = os.path.join(
self.config.in_progress_dir, 'CHANGES_SINCE')
self.config.in_progress_dir, "CHANGES_SINCE")
changes_since_tmp_path = os.path.join(
self.config.in_progress_dir, 'CHANGES_SINCE.tmp')
self.config.in_progress_dir, "CHANGES_SINCE.tmp")
try:
with open(changes_since_tmp_path, 'w') as changes_tmp:
with open(changes_since_tmp_path, "w") as changes_tmp:
changes_tmp.write("-- %s --\n" % title)
with open(changes_since_path, 'r') as changes_orig:
with open(changes_since_path, "r") as changes_orig:
changes_tmp.write(changes_orig.read())
shutil.move(changes_since_tmp_path, changes_since_path)

View File

@@ -1,151 +1,535 @@
"""Revoker module to enable LE revocations."""
"""Revoker module to enable LE revocations.
The backend of this module would fit a database quite nicely, but in order to
minimize dependencies and maintain transparency, the class currently implements
its own storage system. The number of certs that will likely be stored on any
given client might not warrant requiring a database.
"""
import collections
import csv
import logging
import os
import shutil
import tempfile
import Crypto.PublicKey.RSA
import M2Crypto
import zope.component
from letsencrypt.acme import messages
from letsencrypt.acme import util as acme_util
from letsencrypt.client import crypto_util
from letsencrypt.client import display
from letsencrypt.client import interfaces
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client import network
from letsencrypt.client.display import util as display_util
from letsencrypt.client.display import revocation
class Revoker(object):
"""A revocation class for LE.
.. todo:: Add a method to specify your own certificate for revocation - CLI
:ivar network: Network object
:type network: :class:`letsencrypt.client.network`
:ivar installer: Installer object
:type installer: :class:`~letsencrypt.client.interfaces.IInstaller`
:ivar config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
:type config: :class:`~letsencrypt.client.interfaces.IConfig`
:ivar bool no_confirm: Whether or not to ask for confirmation for revocation
"""
def __init__(self, installer, config):
def __init__(self, installer, config, no_confirm=False):
self.network = network.Network(config.server)
self.installer = installer
self.config = config
self.no_confirm = no_confirm
def acme_revocation(self, cert):
"""Handle ACME "revocation" phase.
le_util.make_or_verify_dir(config.cert_key_backup, 0o700, os.geteuid())
:param dict cert: TODO
# TODO: Find a better solution for this...
self.list_path = os.path.join(config.cert_key_backup, "LIST")
# Make sure that the file is available for use for rest of class
open(self.list_path, "a").close()
:returns: ACME "revocation" message.
:rtype: :class:`letsencrypt.acme.message.Revocation`
def revoke_from_key(self, authkey):
"""Revoke all certificates under an authorized key.
:param authkey: Authorized key used in previous transactions
:type authkey: :class:`letsencrypt.client.le_util.Key`
"""
certificate = acme_util.ComparableX509(
M2Crypto.X509.load_cert(cert["backup_cert_file"]))
with open(cert["backup_key_file"], 'rU') as backup_key_file:
key = Crypto.PublicKey.RSA.importKey(backup_key_file.read())
revocation = self.network.send_and_receive_expected(
messages.RevocationRequest.create(
certificate=certificate, key=key),
messages.Revocation)
zope.component.getUtility(interfaces.IDisplay).generic_notification(
"You have successfully revoked the certificate for "
"%s" % cert["cn"])
self.remove_cert_key(cert)
self.list_certs_keys()
return revocation
def list_certs_keys(self):
"""List trusted Let's Encrypt certificates."""
list_file = os.path.join(self.config.cert_key_backup, "LIST")
certs = []
try:
clean_pem = Crypto.PublicKey.RSA.importKey(
authkey.pem).exportKey("PEM")
# https://www.dlitz.net/software/pycrypto/api/current/Crypto.PublicKey.RSA-module.html
except (IndexError, ValueError, TypeError):
raise errors.LetsEncryptRevokerError(
"Invalid key file specified to revoke_from_key")
if not os.path.isfile(list_file):
logging.info(
"You don't have any certificates saved from letsencrypt")
return
c_sha1_vh = {}
for (cert, _, path) in self.installer.get_all_certs_keys():
try:
c_sha1_vh[acme_util.ComparableX509(M2Crypto.X509.load_cert(
cert).get_fingerprint(md='sha1'))] = path
except M2Crypto.X509.X509Error:
continue
with open(list_file, 'rb') as csvfile:
with open(self.list_path, "rb") as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
cert = crypto_util.get_cert_info(row[1])
# idx, cert, key
# Add all keys that match to marked list
# Note: The key can be different than the pub key found in the
# certificate.
_, b_k = self._row_to_backup(row)
try:
test_pem = Crypto.PublicKey.RSA.importKey(
open(b_k).read()).exportKey("PEM")
except (IndexError, ValueError, TypeError):
# This should never happen given the assumptions of the
# module. If it does, it is probably best to delete the
# the offending key/cert. For now... just raise an exception
raise errors.LetsEncryptRevokerError(
"%s - backup file is corrupted.")
b_k = os.path.join(self.config.cert_key_backup,
os.path.basename(row[2]) + "_" + row[0])
b_c = os.path.join(self.config.cert_key_backup,
os.path.basename(row[1]) + "_" + row[0])
cert.update({
"orig_key_file": row[2],
"orig_cert_file": row[1],
"idx": int(row[0]),
"backup_key_file": b_k,
"backup_cert_file": b_c,
"installed": c_sha1_vh.get(cert["fingerprint"], ""),
})
certs.append(cert)
if clean_pem == test_pem:
certs.append(
Cert.fromrow(row, self.config.cert_key_backup))
if certs:
self.choose_certs(certs)
self._safe_revoke(certs)
else:
zope.component.getUtility(interfaces.IDisplay).generic_notification(
"There are not any trusted Let's Encrypt "
"certificates for this server.")
logging.info("No certificates using the authorized key were found.")
def choose_certs(self, certs):
"""Display choose certificates menu.
def revoke_from_cert(self, cert_path):
"""Revoke a certificate by specifying a file path.
:param list certs: List of cert dicts.
.. todo:: Add the ability to revoke the certificate even if the cert
is not stored locally. A path to the auth key will need to be
attained from the user.
:param str cert_path: path to ACME certificate in pem form
"""
displayer = zope.component.getUtility(interfaces.IDisplay)
code, tag = displayer.display_certs(certs)
# Locate the correct certificate (do not rely on filename)
cert_to_revoke = Cert(cert_path)
if code == display.OK:
cert = certs[tag]
if displayer.confirm_revocation(cert):
self.acme_revocation(cert)
with open(self.list_path, "rb") as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
cert = Cert.fromrow(row, self.config.cert_key_backup)
if cert.get_der() == cert_to_revoke.get_der():
self._safe_revoke([cert])
return
logging.info("Associated ACME certificate was not found.")
def revoke_from_menu(self):
"""List trusted Let's Encrypt certificates."""
csha1_vhlist = self._get_installed_locations()
certs = self._populate_saved_certs(csha1_vhlist)
while True:
if certs:
code, selection = revocation.display_certs(certs)
if code == display_util.OK:
revoked_certs = self._safe_revoke([certs[selection]])
# Since we are currently only revoking one cert at a time...
if revoked_certs:
del certs[selection]
elif code == display_util.HELP:
revocation.more_info_cert(certs[selection])
else:
return
else:
self.choose_certs(certs)
elif code == display.HELP:
cert = certs[tag]
displayer.more_info_cert(cert)
self.choose_certs(certs)
else:
exit(0)
logging.info(
"There are not any trusted Let's Encrypt "
"certificates for this server.")
return
def remove_cert_key(self, cert): # pylint: disable=no-self-use
def _populate_saved_certs(self, csha1_vhlist):
# pylint: disable=no-self-use
"""Populate a list of all the saved certs.
It is important to read from the file rather than the directory.
We assume that the LIST file is the master record and depending on
program crashes, this may differ from what is actually in the directory.
Namely, additional certs/keys may exist. There should never be any
certs/keys in the LIST that don't exist in the directory however.
:param dict csha1_vhlist: map from cert sha1 fingerprints to a list
of it's installed location paths.
"""
certs = []
with open(self.list_path, "rb") as csvfile:
csvreader = csv.reader(csvfile)
# idx, orig_cert, orig_key
for row in csvreader:
cert = Cert.fromrow(row, self.config.cert_key_backup)
# If we were able to find the cert installed... update status
cert.installed = csha1_vhlist.get(cert.get_fingerprint(), [])
certs.append(cert)
return certs
def _get_installed_locations(self):
"""Get installed locations of certificates.
:returns: map from cert sha1 fingerprint to :class:`list` of vhosts
where the certificate is installed.
"""
csha1_vhlist = {}
if self.installer is None:
return csha1_vhlist
for (cert_path, _, path) in self.installer.get_all_certs_keys():
try:
cert_sha1 = M2Crypto.X509.load_cert(
cert_path).get_fingerprint(md="sha1")
except (IOError, M2Crypto.X509.X509Error):
continue
if cert_sha1 in csha1_vhlist:
csha1_vhlist[cert_sha1].append(path)
else:
csha1_vhlist[cert_sha1] = [path]
return csha1_vhlist
def _safe_revoke(self, certs):
"""Confirm and revoke certificates.
:param certs: certs intended to be revoked
:type certs: :class:`list` of :class:`letsencrypt.client.revoker.Cert`
:returns: certs successfully revoked
:rtype: :class:`list` of :class:`letsencrypt.client.revoker.Cert`
"""
success_list = []
try:
for cert in certs:
if self.no_confirm or revocation.confirm_revocation(cert):
try:
self._acme_revoke(cert)
except errors.LetsEncryptClientError:
# TODO: Improve error handling when networking is set...
logging.error(
"Unable to revoke cert:%s%s", os.linesep, str(cert))
success_list.append(cert)
revocation.success_revocation(cert)
finally:
if success_list:
self._remove_certs_keys(success_list)
return success_list
def _acme_revoke(self, cert):
"""Revoke the certificate with the ACME server.
:param cert: certificate to revoke
:type cert: :class:`letsencrypt.client.revoker.Cert`
:returns: TODO
"""
# These will both have to change in the future away from M2Crypto
# pylint: disable=protected-access
certificate = acme_util.ComparableX509(cert._cert)
try:
with open(cert.backup_key_path, "rU") as backup_key_file:
key = Crypto.PublicKey.RSA.importKey(backup_key_file.read())
# If the key file doesn't exist... or is corrupted
except (IndexError, ValueError, TypeError):
raise errors.LetsEncryptRevokerError(
"Corrupted backup key file: %s" % cert.backup_key_path)
# TODO: Catch error associated with already revoked and proceed.
return self.network.send_and_receive_expected(
messages.RevocationRequest.create(certificate=certificate, key=key),
messages.Revocation)
def _remove_certs_keys(self, cert_list): # pylint: disable=no-self-use
"""Remove certificate and key.
:param dict cert: Cert dict used throughout revocation
:param list cert_list: Must contain certs, each is of type
:class:`letsencrypt.client.revoker.Cert`
"""
list_file = os.path.join(self.config.cert_key_backup, "LIST")
list_file2 = os.path.join(self.config.cert_key_backup, "LIST.tmp")
# This must occur first, LIST is the official key
self._remove_certs_from_list(cert_list)
with open(list_file, 'rb') as orgfile:
# Remove files
for cert in cert_list:
os.remove(cert.backup_path)
os.remove(cert.backup_key_path)
def _remove_certs_from_list(self, cert_list): # pylint: disable=no-self-use
"""Remove a certificate from the LIST file.
:param list cert_list: Must contain valid certs, each is of type
:class:`letsencrypt.client.revoker.Cert`
"""
list_path2 = tempfile.mktemp(".tmp", "LIST")
idx = 0
with open(self.list_path, "rb") as orgfile:
csvreader = csv.reader(orgfile)
with open(list_file2, 'wb') as newfile:
with open(list_path2, "wb") as newfile:
csvwriter = csv.writer(newfile)
for row in csvreader:
if not (row[0] == str(cert["idx"]) and
row[1] == cert["orig_cert_file"] and
row[2] == cert["orig_key_file"]):
if idx >= len(cert_list) or row != cert_list[idx].get_row():
csvwriter.writerow(row)
else:
idx += 1
shutil.copy2(list_file2, list_file)
os.remove(list_file2)
os.remove(cert["backup_cert_file"])
os.remove(cert["backup_key_file"])
# This should never happen...
if idx != len(cert_list):
raise errors.LetsEncryptRevokerError(
"Did not find all cert_list items to remove from LIST")
shutil.copy2(list_path2, self.list_path)
os.remove(list_path2)
def _row_to_backup(self, row):
"""Convenience function
:param list row: csv file row 'idx', 'cert_path', 'key_path'
:returns: tuple of the form ('backup_cert_path', 'backup_key_path')
:rtype: tuple
"""
return (self._get_backup(self.config.cert_key_backup, row[0], row[1]),
self._get_backup(self.config.cert_key_backup, row[0], row[2]))
@classmethod
def store_cert_key(cls, cert_path, key_path, config):
"""Store certificate key. (Used to allow quick revocation)
:param str cert_path: Path to a certificate file.
:param str key_path: Path to authorized key for certificate
:ivar config: Configuration.
:type config: :class:`~letsencrypt.client.interfaces.IConfig`
"""
list_path = os.path.join(config.cert_key_backup, "LIST")
le_util.make_or_verify_dir(config.cert_key_backup, 0o700, os.geteuid())
cls._catalog_files(
config.cert_key_backup, cert_path, key_path, list_path)
@classmethod
def _catalog_files(cls, backup_dir, cert_path, key_path, list_path):
idx = 0
if os.path.isfile(list_path):
with open(list_path, "r+b") as csvfile:
csvreader = csv.reader(csvfile)
# Find the highest index in the file
for row in csvreader:
idx = int(row[0]) + 1
csvwriter = csv.writer(csvfile)
# You must move the files before appending the row
cls._copy_files(backup_dir, idx, cert_path, key_path)
csvwriter.writerow([str(idx), cert_path, key_path])
else:
with open(list_path, "wb") as csvfile:
csvwriter = csv.writer(csvfile)
# You must move the files before appending the row
cls._copy_files(backup_dir, idx, cert_path, key_path)
csvwriter.writerow([str(idx), cert_path, key_path])
@classmethod
def _copy_files(cls, backup_dir, idx, cert_path, key_path):
"""Copies the files into the backup dir appropriately."""
shutil.copy2(cert_path, cls._get_backup(backup_dir, idx, cert_path))
shutil.copy2(key_path, cls._get_backup(backup_dir, idx, key_path))
@classmethod
def _get_backup(cls, backup_dir, idx, orig_path):
"""Returns the path to the backup."""
return os.path.join(
backup_dir, "{name}_{idx}".format(
name=os.path.basename(orig_path), idx=str(idx)))
class Cert(object):
"""Cert object used for Revocation convenience.
:ivar _cert: M2Crypto X509 cert
:type _cert: :class:`M2Crypto.X509`
:ivar int idx: convenience index used for listing
:ivar orig: (`str` path - original certificate, `str` status)
:type orig: :class:`PathStatus`
:ivar orig_key: (`str` path - original auth key, `str` status)
:type orig_key: :class:`PathStatus`
:ivar str backup_path: backup filepath of the certificate
:ivar str backup_key_path: backup filepath of the authorized key
:ivar list installed: `list` of `str` describing all locations the cert
is installed
"""
PathStatus = collections.namedtuple("PathStatus", "path status")
"""Convenience container to hold path and status info"""
DELETED_MSG = "This file has been moved or deleted"
CHANGED_MSG = "This file has changed"
def __init__(self, cert_path):
"""Cert initialization
:param str cert_filepath: Name of file containing certificate in
PEM format.
"""
try:
self._cert = M2Crypto.X509.load_cert(cert_path)
except (IOError, M2Crypto.X509.X509Error):
raise errors.LetsEncryptRevokerError(
"Error loading certificate: %s" % cert_path)
self.idx = -1
self.orig = None
self.orig_key = None
self.backup_path = ""
self.backup_key_path = ""
self.installed = ["Unknown"]
@classmethod
def fromrow(cls, row, backup_dir):
# pylint: disable=protected-access
"""Initialize Cert from a csv row."""
idx = int(row[0])
backup = Revoker._get_backup(backup_dir, idx, row[1])
backup_key = Revoker._get_backup(backup_dir, idx, row[2])
obj = cls(backup)
obj.add_meta(idx, row[1], row[2], backup, backup_key)
return obj
def get_row(self):
"""Returns a list in CSV format. If meta data is available."""
if self.orig is not None and self.orig_key is not None:
return [str(self.idx), self.orig.path, self.orig_key.path]
return None
def add_meta(self, idx, orig, orig_key, backup, backup_key):
"""Add meta data to cert
:param int idx: convenience index for revoker
:param tuple orig: (`str` original certificate filepath, `str` status)
:param tuple orig_key: (`str` original auth key path, `str` status)
:param str backup: backup certificate filepath
:param str backup_key: backup key filepath
"""
status = ""
key_status = ""
# Verify original cert path
if not os.path.isfile(orig):
status = Cert.DELETED_MSG
else:
o_cert = M2Crypto.X509.load_cert(orig)
if self.get_fingerprint() != o_cert.get_fingerprint(md="sha1"):
status = Cert.CHANGED_MSG
# Verify original key path
if not os.path.isfile(orig_key):
key_status = Cert.DELETED_MSG
else:
with open(orig_key, "r") as fd:
key_pem = fd.read()
with open(backup_key, "r") as fd:
backup_key_pem = fd.read()
if key_pem != backup_key_pem:
key_status = Cert.CHANGED_MSG
self.idx = idx
self.orig = Cert.PathStatus(orig, status)
self.orig_key = Cert.PathStatus(orig_key, key_status)
self.backup_path = backup
self.backup_key_path = backup_key
# M2Crypto is eventually going to be replaced, hence the reason for _cert
def get_cn(self):
"""Get common name."""
return self._cert.get_subject().CN
def get_fingerprint(self):
"""Get SHA1 fingerprint."""
return self._cert.get_fingerprint(md="sha1")
def get_not_before(self):
"""Get not_valid_before field."""
return self._cert.get_not_before().get_datetime()
def get_not_after(self):
"""Get not_valid_after field."""
return self._cert.get_not_after().get_datetime()
def get_der(self):
"""Get certificate in der format."""
return self._cert.as_der()
def get_pub_key(self):
"""Get public key size.
.. todo:: M2Crypto doesn't support ECC, this will have to be updated
"""
return "RSA " + str(self._cert.get_pubkey().size() * 8)
def get_san(self):
"""Get subject alternative name if available."""
try:
return self._cert.get_ext("subjectAltName").get_value()
except LookupError:
return ""
def __str__(self):
text = [
"Subject: %s" % self._cert.get_subject().as_text(),
"SAN: %s" % self.get_san(),
"Issuer: %s" % self._cert.get_issuer().as_text(),
"Public Key: %s" % self.get_pub_key(),
"Not Before: %s" % str(self.get_not_before()),
"Not After: %s" % str(self.get_not_after()),
"Serial Number: %s" % self._cert.get_serial_number(),
"SHA1: %s%s" % (self.get_fingerprint(), os.linesep),
"Installed: %s" % ", ".join(self.installed),
]
if self.orig is not None:
if self.orig.status == "":
text.append("Path: %s" % self.orig.path)
else:
text.append("Orig Path: %s (%s)" % self.orig)
if self.orig_key is not None:
if self.orig_key.status == "":
text.append("Auth Key Path: %s" % self.orig_key.path)
else:
text.append("Orig Auth Key Path: %s (%s)" % self.orig_key)
text.append("")
return os.linesep.join(text)
def pretty_print(self):
"""Nicely frames a cert str"""
frame = "-" * (display_util.WIDTH - 4) + os.linesep
return "{frame}{cert}{frame}".format(frame=frame, cert=str(self))

26
letsencrypt/client/standalone_authenticator.py Normal file → Executable file
View File

@@ -29,6 +29,8 @@ class StandaloneAuthenticator(object):
"""
zope.interface.implements(interfaces.IAuthenticator)
description = "Standalone Authenticator"
def __init__(self):
self.child_pid = None
self.parent_pid = os.getpid()
@@ -39,6 +41,13 @@ class StandaloneAuthenticator(object):
self.private_key = None
self.ssl_conn = None
def prepare(self):
"""There is nothing left to setup.
.. todo:: This should probably do the port check
"""
def client_signal_handler(self, sig, unused_frame):
"""Signal handler for the parent process.
@@ -149,14 +158,14 @@ class StandaloneAuthenticator(object):
if self.subproc_state == "ready":
return True
elif self.subproc_state == "inuse":
display.generic_notification(
display.notification(
"Could not bind TCP port {0} because it is already in "
"use by another process on this system (such as a web "
"server). Please stop the program in question and then "
"try again.".format(port))
return False
elif self.subproc_state == "cantbind":
display.generic_notification(
display.notification(
"Could not bind TCP port {0} because you don't have "
"the appropriate permissions (for example, you "
"aren't running this program as "
@@ -164,7 +173,7 @@ class StandaloneAuthenticator(object):
return False
time.sleep(0.1)
display.generic_notification(
display.notification(
"Subprocess unexpectedly timed out while trying to bind TCP "
"port {0}.".format(port))
@@ -287,7 +296,7 @@ class StandaloneAuthenticator(object):
pid = listeners[0]
name = psutil.Process(pid).name()
display = zope.component.getUtility(interfaces.IDisplay)
display.generic_notification(
display.notification(
"The program {0} (process ID {1}) is already listening "
"on TCP port {2}. This will prevent us from binding to "
"that port. Please stop the {0} program temporarily "
@@ -402,3 +411,12 @@ class StandaloneAuthenticator(object):
# TODO: restore original signal handlers in parent process
# by resetting their actions to SIG_DFL
# print "TCP listener subprocess has been told to shut down"
def more_info(self): # pylint: disable=no-self-use
"""Human-readable string that describes the Authenticator."""
return ("The Standalone Authenticator uses PyOpenSSL to listen "
"on port 443 and perform DVSNI challenges. Once a certificate"
"is attained, it will be saved in the "
"(TODO) current working directory.{0}{0}"
"Port 443 must be open in order to use the "
"Standalone Authenticator.".format(os.linesep))

View File

@@ -132,31 +132,6 @@ class TwoVhost80Test(util.ApacheTest):
self.assertEqual(len(self.config.vhosts), 5)
@mock.patch("letsencrypt.client.apache.configurator."
"subprocess.Popen")
def test_get_version(self, mock_popen):
mock_popen().communicate.return_value = (
"Server Version: Apache/2.4.2 (Debian)", "")
self.assertEqual(self.config.get_version(), (2, 4, 2))
mock_popen().communicate.return_value = (
"Server Version: Apache/2 (Linux)", "")
self.assertEqual(self.config.get_version(), (2,))
mock_popen().communicate.return_value = (
"Server Version: Apache (Debian)", "")
self.assertRaises(
errors.LetsEncryptConfiguratorError, self.config.get_version)
mock_popen().communicate.return_value = (
"Server Version: Apache/2.3\n Apache/2.4.7", "")
self.assertRaises(
errors.LetsEncryptConfiguratorError, self.config.get_version)
mock_popen.side_effect = OSError("Can't find program")
self.assertRaises(
errors.LetsEncryptConfiguratorError, self.config.get_version)
@mock.patch("letsencrypt.client.apache.configurator."
"dvsni.ApacheDvsni.perform")
@mock.patch("letsencrypt.client.apache.configurator."
@@ -189,5 +164,31 @@ class TwoVhost80Test(util.ApacheTest):
self.assertEqual(mock_restart.call_count, 1)
@mock.patch("letsencrypt.client.apache.configurator."
"subprocess.Popen")
def test_get_version(self, mock_popen):
mock_popen().communicate.return_value = (
"Server Version: Apache/2.4.2 (Debian)", "")
self.assertEqual(self.config.get_version(), (2, 4, 2))
mock_popen().communicate.return_value = (
"Server Version: Apache/2 (Linux)", "")
self.assertEqual(self.config.get_version(), (2,))
mock_popen().communicate.return_value = (
"Server Version: Apache (Debian)", "")
self.assertRaises(
errors.LetsEncryptConfiguratorError, self.config.get_version)
mock_popen().communicate.return_value = (
"Server Version: Apache/2.3\n Apache/2.4.7", "")
self.assertRaises(
errors.LetsEncryptConfiguratorError, self.config.get_version)
mock_popen.side_effect = OSError("Can't find program")
self.assertRaises(
errors.LetsEncryptConfiguratorError, self.config.get_version)
if __name__ == '__main__':
unittest.main()

View File

@@ -6,7 +6,6 @@ import shutil
import mock
from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.client import le_util

View File

@@ -8,8 +8,8 @@ import augeas
import mock
import zope.component
from letsencrypt.client import display
from letsencrypt.client import errors
from letsencrypt.client.display import util as display_util
from letsencrypt.client.tests.apache import util
@@ -20,7 +20,7 @@ class ApacheParserTest(util.ApacheTest):
def setUp(self):
super(ApacheParserTest, self).setUp()
zope.component.provideUtility(display.FileDisplay(sys.stdout))
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
from letsencrypt.client.apache.parser import ApacheParser
self.aug = augeas.Augeas(flags=augeas.Augeas.NONE)

View File

@@ -75,6 +75,8 @@ def get_apache_configurator(
work_dir=work_dir),
version)
config.prepare()
return config

View File

@@ -6,10 +6,67 @@ import mock
from letsencrypt.client import errors
class DetermineAuthenticatorTest(unittest.TestCase):
def setUp(self):
from letsencrypt.client.apache.configurator import ApacheConfigurator
from letsencrypt.client.standalone_authenticator import (
StandaloneAuthenticator)
self.mock_stand = mock.MagicMock(
spec=StandaloneAuthenticator, description="Apache Web Server")
self.mock_apache = mock.MagicMock(
spec=ApacheConfigurator, description="Standalone Authenticator")
self.mock_config = mock.Mock()
self.all_auths = [self.mock_apache, self.mock_stand]
@classmethod
def _call(cls, all_auths):
from letsencrypt.client.client import determine_authenticator
return determine_authenticator(all_auths)
@mock.patch("letsencrypt.client.client.display_ops.choose_authenticator")
def test_accept_two(self, mock_choose):
mock_choose.return_value = self.mock_stand()
self.assertEqual(self._call(self.all_auths), self.mock_stand())
def test_accept_one(self):
self.mock_apache.prepare.return_value = self.mock_apache
self.assertEqual(
self._call(self.all_auths[:1]), self.mock_apache)
def test_no_installation_one(self):
self.mock_apache.prepare.side_effect = (
errors.LetsEncryptNoInstallationError)
self.assertEqual(self._call(self.all_auths), self.mock_stand)
def test_no_installations(self):
self.mock_apache.prepare.side_effect = (
errors.LetsEncryptNoInstallationError)
self.mock_stand.prepare.side_effect = (
errors.LetsEncryptNoInstallationError)
self.assertRaises(errors.LetsEncryptClientError,
self._call,
self.all_auths)
@mock.patch("letsencrypt.client.client.logging")
@mock.patch("letsencrypt.client.client.display_ops.choose_authenticator")
def test_misconfigured(self, mock_choose, unused_log):
self.mock_apache.prepare.side_effect = (
errors.LetsEncryptMisconfigurationError)
mock_choose.return_value = self.mock_apache
self.assertTrue(self._call(self.all_auths) is None)
class RollbackTest(unittest.TestCase):
"""Test the rollback function."""
def setUp(self):
self.m_install = mock.MagicMock()
from letsencrypt.client.apache.configurator import ApacheConfigurator
self.m_install = mock.MagicMock(spec=ApacheConfigurator)
@classmethod
def _call(cls, checkpoints):
@@ -25,59 +82,6 @@ class RollbackTest(unittest.TestCase):
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 1)
self.assertEqual(self.m_install().restart.call_count, 1)
@mock.patch("letsencrypt.client.client.zope.component.getUtility")
@mock.patch("letsencrypt.client.reverter.Reverter")
@mock.patch("letsencrypt.client.client.determine_installer")
def test_misconfiguration_fixed(self, mock_det, mock_rev, mock_input):
mock_det.side_effect = [errors.LetsEncryptMisconfigurationError,
self.m_install]
mock_input().generic_yesno.return_value = True
self._call(1)
# Don't rollback twice... (only on one object)
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 0)
self.assertEqual(mock_rev().rollback_checkpoints.call_count, 1)
# Only restart once
self.assertEqual(self.m_install.restart.call_count, 1)
@mock.patch("letsencrypt.client.client.zope.component.getUtility")
@mock.patch("letsencrypt.client.client.logging.warning")
@mock.patch("letsencrypt.client.reverter.Reverter")
@mock.patch("letsencrypt.client.client.determine_installer")
def test_misconfiguration_remains(
self, mock_det, mock_rev, mock_warn, mock_input):
mock_det.side_effect = errors.LetsEncryptMisconfigurationError
mock_input().generic_yesno.return_value = True
self._call(1)
# Don't rollback twice... (only on one object)
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 0)
self.assertEqual(mock_rev().rollback_checkpoints.call_count, 1)
# Never call restart because init never succeeds
self.assertEqual(self.m_install().restart.call_count, 0)
# There should be a warning about the remaining problem
self.assertEqual(mock_warn.call_count, 1)
@mock.patch("letsencrypt.client.client.zope.component.getUtility")
@mock.patch("letsencrypt.client.reverter.Reverter")
@mock.patch("letsencrypt.client.client.determine_installer")
def test_user_decides_to_manually_investigate(
self, mock_det, mock_rev, mock_input):
mock_det.side_effect = errors.LetsEncryptMisconfigurationError
mock_input().generic_yesno.return_value = False
self._call(1)
# Neither is ever called
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 0)
self.assertEqual(mock_rev().rollback_checkpoints.call_count, 0)
@mock.patch("letsencrypt.client.client.determine_installer")
def test_no_installer(self, mock_det):
mock_det.return_value = None
@@ -86,5 +90,5 @@ class RollbackTest(unittest.TestCase):
self._call(1)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

View File

@@ -9,7 +9,8 @@ class NamespaceConfigTest(unittest.TestCase):
def setUp(self):
from letsencrypt.client.configuration import NamespaceConfig
namespace = mock.MagicMock(work_dir='/tmp/foo', foo='bar')
namespace = mock.MagicMock(
work_dir='/tmp/foo', foo='bar', server='acme-server.org:443')
self.config = NamespaceConfig(namespace)
def test_proxy_getattr(self):
@@ -24,7 +25,8 @@ class NamespaceConfigTest(unittest.TestCase):
constants.REC_TOKEN_DIR = '/r'
self.assertEqual(self.config.temp_checkpoint_dir, '/tmp/foo/t')
self.assertEqual(self.config.in_progress_dir, '/tmp/foo/../p')
self.assertEqual(self.config.cert_key_backup, '/tmp/foo/c/')
self.assertEqual(
self.config.cert_key_backup, '/tmp/foo/c/acme-server.org')
self.assertEqual(self.config.rec_token_dir, '/r')

View File

@@ -1,5 +1,4 @@
"""Tests for letsencrypt.client.crypto_util."""
import datetime
import os
import pkg_resources
import unittest
@@ -63,9 +62,8 @@ class MakeKeyTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def test_it(self): # pylint: disable=no-self-use
from letsencrypt.client.crypto_util import make_key
# Do not test larger keys as it takes too long.
M2Crypto.RSA.load_key_string(make_key(1024))
M2Crypto.RSA.load_key_string(make_key(2048))
M2Crypto.RSA.load_key_string(make_key(4096))
class ValidPrivkeyTest(unittest.TestCase):
@@ -95,43 +93,5 @@ class MakeSSCertTest(unittest.TestCase):
make_ss_cert(RSA256_KEY, ['example.com', 'www.example.com'])
class GetCertInfoTest(unittest.TestCase):
"""Tests for letsencrypt.client.crypto_util.get_cert_info."""
def setUp(self):
self.cert_info = {
'not_before': datetime.datetime(
2014, 12, 11, 22, 34, 45, tzinfo=M2Crypto.ASN1.UTC),
'not_after': datetime.datetime(
2014, 12, 18, 22, 34, 45, tzinfo=M2Crypto.ASN1.UTC),
'subject': 'C=US, ST=Michigan, L=Ann Arbor, O=University '
'of Michigan and the EFF, CN=example.com',
'cn': 'example.com',
'issuer': 'C=US, ST=Michigan, L=Ann Arbor, O=University '
'of Michigan and the EFF, CN=example.com',
'serial': 1337L,
'pub_key': 'RSA 512',
}
def _call(self, name):
from letsencrypt.client.crypto_util import get_cert_info
self.assertEqual(get_cert_info(pkg_resources.resource_filename(
__name__, os.path.join('testdata', name))), self.cert_info)
def test_single_domain(self):
self.cert_info.update({
'san': '',
'fingerprint': '9F8CE01450D288467C3326AC0457E351939C72E',
})
self._call('cert.pem')
def test_san(self):
self.cert_info.update({
'san': 'DNS:example.com, DNS:www.example.com',
'fingerprint': '62F7110431B8E8F55905DBE5592518F9634AC50A',
})
self._call('cert-san.pem')
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1 @@
"""Let's Encrypt Display Tests"""

View File

@@ -0,0 +1,58 @@
"""Module for enhancement UI."""
import logging
import unittest
import mock
from letsencrypt.client import errors
from letsencrypt.client.display import util as display_util
class AskTest(unittest.TestCase):
"""Test the ask method."""
def setUp(self):
logging.disable(logging.CRITICAL)
def tearDown(self):
logging.disable(logging.NOTSET)
@classmethod
def _call(cls, enhancement):
from letsencrypt.client.display.enhancements import ask
return ask(enhancement)
@mock.patch("letsencrypt.client.display.enhancements.util")
def test_redirect(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 1)
self.assertTrue(self._call("redirect"))
def test_key_error(self):
self.assertRaises(
errors.LetsEncryptClientError, self._call, "unknown_enhancement")
class RedirectTest(unittest.TestCase):
"""Test the redirect_by_default method."""
@classmethod
def _call(cls):
from letsencrypt.client.display.enhancements import redirect_by_default
return redirect_by_default()
@mock.patch("letsencrypt.client.display.enhancements.util")
def test_secure(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 1)
self.assertTrue(self._call())
@mock.patch("letsencrypt.client.display.enhancements.util")
def test_cancel(self, mock_util):
mock_util().menu.return_value = (display_util.CANCEL, 1)
self.assertFalse(self._call())
@mock.patch("letsencrypt.client.display.enhancements.util")
def test_easy(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 0)
self.assertFalse(self._call())
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,191 @@
"""Test letsencrypt.client.display.ops."""
import sys
import unittest
import mock
import zope.component
from letsencrypt.client.display import util as display_util
class ChooseAuthenticatorTest(unittest.TestCase):
"""Test choose_authenticator function."""
def setUp(self):
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
self.mock_apache = mock.Mock()
self.mock_stand = mock.Mock()
self.mock_apache().more_info.return_value = "Apache Info"
self.mock_stand().more_info.return_value = "Standalone Info"
self.auths = [self.mock_apache, self.mock_stand]
self.errs = {self.mock_apache: "This is an error message."}
@classmethod
def _call(cls, auths, errs):
from letsencrypt.client.display.ops import choose_authenticator
return choose_authenticator(auths, errs)
@mock.patch("letsencrypt.client.display.ops.util")
def test_successful_choice(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 0)
ret = self._call(self.auths, {})
self.assertEqual(ret, self.mock_apache)
@mock.patch("letsencrypt.client.display.ops.util")
def test_more_info(self, mock_util):
mock_util().menu.side_effect = [
(display_util.HELP, 0),
(display_util.HELP, 1),
(display_util.OK, 1),
]
ret = self._call(self.auths, self.errs)
self.assertEqual(mock_util().notification.call_count, 2)
self.assertEqual(ret, self.mock_stand)
@mock.patch("letsencrypt.client.display.ops.util")
def test_no_choice(self, mock_util):
mock_util().menu.return_value = (display_util.CANCEL, 0)
self.assertTrue(self._call(self.auths, {}) is None)
class GenHttpsNamesTest(unittest.TestCase):
"""Test _gen_https_names."""
def setUp(self):
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
@classmethod
def _call(cls, domains):
from letsencrypt.client.display.ops import _gen_https_names
return _gen_https_names(domains)
def test_zero(self):
self.assertEqual(self._call([]), "")
def test_one(self):
doms = [
"example.com",
"asllkjsadfljasdf.c",
]
for dom in doms:
self.assertEqual(self._call([dom]), "https://%s" % dom)
def test_two(self):
domains_list = [
["foo.bar.org", "bar.org"],
["paypal.google.facebook.live.com", "*.zombo.example.com"],
]
for doms in domains_list:
self.assertEqual(
self._call(doms),
"https://{dom[0]} and https://{dom[1]}".format(dom=doms))
def test_three(self):
doms = ["a.org", "b.org", "c.org"]
# We use an oxford comma
self.assertEqual(
self._call(doms),
"https://{dom[0]}, https://{dom[1]}, and https://{dom[2]}".format(
dom=doms))
def test_four(self):
doms = ["a.org", "b.org", "c.org", "d.org"]
exp = ("https://{dom[0]}, https://{dom[1]}, https://{dom[2]}, "
"and https://{dom[3]}".format(dom=doms))
self.assertEqual(self._call(doms), exp)
class ChooseNamesTest(unittest.TestCase):
"""Test choose names."""
def setUp(self):
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
self.mock_install = mock.MagicMock()
@classmethod
def _call(cls, installer):
from letsencrypt.client.display.ops import choose_names
return choose_names(installer)
@mock.patch("letsencrypt.client.display.ops._choose_names_manually")
def test_no_installer(self, mock_manual):
self._call(None)
self.assertEqual(mock_manual.call_count, 1)
@mock.patch("letsencrypt.client.display.ops.util")
def test_no_installer_cancel(self, mock_util):
mock_util().input.return_value = (display_util.CANCEL, [])
self.assertEqual(self._call(None), [])
@mock.patch("letsencrypt.client.display.ops.util")
def test_no_names_choose(self, mock_util):
self.mock_install().get_all_names.return_value = set()
mock_util().yesno.return_value = True
domain = "example.com"
mock_util().input.return_value = (display_util.OK, domain)
actual_doms = self._call(self.mock_install)
self.assertEqual(mock_util().input.call_count, 1)
self.assertEqual(actual_doms, [domain])
@mock.patch("letsencrypt.client.display.ops.util")
def test_no_names_quit(self, mock_util):
self.mock_install().get_all_names.return_value = set()
mock_util().yesno.return_value = False
self.assertEqual(self._call(self.mock_install), [])
@mock.patch("letsencrypt.client.display.ops.util")
def test_filter_names_valid_return(self, mock_util):
self.mock_install.get_all_names.return_value = set(["example.com"])
mock_util().checklist.return_value = (display_util.OK, ["example.com"])
names = self._call(self.mock_install)
self.assertEqual(names, ["example.com"])
self.assertEqual(mock_util().checklist.call_count, 1)
@mock.patch("letsencrypt.client.display.ops.util")
def test_filter_names_nothing_selected(self, mock_util):
self.mock_install.get_all_names.return_value = set(["example.com"])
mock_util().checklist.return_value = (display_util.OK, [])
self.assertEqual(self._call(self.mock_install), [])
@mock.patch("letsencrypt.client.display.ops.util")
def test_filter_names_cancel(self, mock_util):
self.mock_install.get_all_names.return_value = set(["example.com"])
mock_util().checklist.return_value = (
display_util.CANCEL, ["example.com"])
self.assertEqual(self._call(self.mock_install), [])
class SuccessInstallationTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
"""Test the success installation message."""
@classmethod
def _call(cls, names):
from letsencrypt.client.display.ops import success_installation
success_installation(names)
@mock.patch("letsencrypt.client.display.ops.util")
def test_success_installation(self, mock_util):
mock_util().notification.return_value = None
names = ["example.com", "abc.com"]
self._call(names)
self.assertEqual(mock_util().notification.call_count, 1)
arg = mock_util().notification.call_args_list[0][0][0]
for name in names:
self.assertTrue(name in arg)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,103 @@
"""Test :mod:`letsencrypt.client.display.revocation`."""
import os
import pkg_resources
import sys
import unittest
import mock
import zope.component
from letsencrypt.client.display import util as display_util
class DisplayCertsTest(unittest.TestCase):
def setUp(self):
from letsencrypt.client.revoker import Cert
base_package = "letsencrypt.client.tests"
self.cert0 = Cert(pkg_resources.resource_filename(
base_package, os.path.join("testdata", "cert.pem")))
self.cert1 = Cert(pkg_resources.resource_filename(
base_package, os.path.join("testdata", "cert-san.pem")))
self.certs = [self.cert0, self.cert1]
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
@classmethod
def _call(cls, certs):
from letsencrypt.client.display.revocation import display_certs
return display_certs(certs)
@mock.patch("letsencrypt.client.display.revocation.util")
def test_revocation(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 0)
code, choice = self._call(self.certs)
self.assertEqual(display_util.OK, code)
self.assertEqual(self.certs[choice], self.cert0)
@mock.patch("letsencrypt.client.display.revocation.util")
def test_cancel(self, mock_util):
mock_util().menu.return_value = (display_util.CANCEL, -1)
code, _ = self._call(self.certs)
self.assertEqual(display_util.CANCEL, code)
class MoreInfoCertTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
@classmethod
def _call(cls, cert):
from letsencrypt.client.display.revocation import more_info_cert
more_info_cert(cert)
@mock.patch("letsencrypt.client.display.revocation.util")
def test_more_info(self, mock_util):
self._call(mock.MagicMock())
self.assertEqual(mock_util().notification.call_count, 1)
class SuccessRevocationTest(unittest.TestCase):
def setUp(self):
from letsencrypt.client.revoker import Cert
base_package = "letsencrypt.client.tests"
self.cert = Cert(pkg_resources.resource_filename(
base_package, os.path.join("testdata", "cert.pem")))
@classmethod
def _call(cls, cert):
from letsencrypt.client.display.revocation import success_revocation
success_revocation(cert)
# Pretty trivial test... something is displayed...
@mock.patch("letsencrypt.client.display.revocation.util")
def test_success_revocation(self, mock_util):
self._call(self.cert)
self.assertEqual(mock_util().notification.call_count, 1)
class ConfirmRevocationTest(unittest.TestCase):
def setUp(self):
from letsencrypt.client.revoker import Cert
self.cert = Cert(pkg_resources.resource_filename(
"letsencrypt.client.tests", os.path.join("testdata", "cert.pem")))
@classmethod
def _call(cls, cert):
from letsencrypt.client.display.revocation import confirm_revocation
return confirm_revocation(cert)
@mock.patch("letsencrypt.client.display.revocation.util")
def test_confirm_revocation(self, mock_util):
mock_util().yesno.return_value = True
self.assertTrue(self._call(self.cert))
mock_util().yesno.return_value = False
self.assertFalse(self._call(self.cert))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,349 @@
"""Test :mod:`letsencrypt.client.display.util`."""
import os
import unittest
import mock
from letsencrypt.client.display import util as display_util
class DisplayT(unittest.TestCase):
"""Base class for both utility classes."""
# pylint: disable=too-few-public-methods
def setUp(self):
self.choices = [("First", "Description1"), ("Second", "Description2")]
self.tags = ["tag1", "tag2", "tag3"]
self.tags_choices = [("1", "tag1"), ("2", "tag2"), ("3", "tag3")]
def visual(displayer, choices):
"""Visually test all of the display functions."""
displayer.notification("Random notification!")
displayer.menu("Question?", choices,
ok_label="O", cancel_label="Can", help_label="??")
displayer.menu("Question?", [choice[1] for choice in choices],
ok_label="O", cancel_label="Can", help_label="??")
displayer.input("Input Message")
displayer.yesno("YesNo Message", yes_label="Yessir", no_label="Nosir")
displayer.checklist("Checklist Message", [choice[0] for choice in choices])
class NcursesDisplayTest(DisplayT):
"""Test ncurses display.
Since this is mostly a wrapper, it might be more helpful to test the actual
dialog boxes. The test_visual function will actually display the various
boxes but requires the user to do the verification. If something seems amiss
please use the test_visual function to debug it, the automatic tests rely
on too much mocking.
"""
def setUp(self):
super(NcursesDisplayTest, self).setUp()
self.displayer = display_util.NcursesDisplay()
self.default_menu_options = {
"choices": self.choices,
"ok_label": "OK",
"cancel_label": "Cancel",
"help_button": False,
"help_label": "",
"width": display_util.WIDTH,
"height": display_util.HEIGHT,
"menu_height": display_util.HEIGHT-6,
}
@mock.patch("letsencrypt.client.display.util.dialog.Dialog.msgbox")
def test_notification(self, mock_msgbox):
"""Kind of worthless... one liner."""
self.displayer.notification("message")
self.assertEqual(mock_msgbox.call_count, 1)
@mock.patch("letsencrypt.client.display.util.dialog.Dialog.menu")
def test_menu_tag_and_desc(self, mock_menu):
mock_menu.return_value = (display_util.OK, "First")
ret = self.displayer.menu("Message", self.choices)
mock_menu.assert_called_with("Message", **self.default_menu_options)
self.assertEqual(ret, (display_util.OK, 0))
@mock.patch("letsencrypt.client.display.util.dialog.Dialog.menu")
def test_menu_tag_and_desc_cancel(self, mock_menu):
mock_menu.return_value = (display_util.CANCEL, "")
ret = self.displayer.menu("Message", self.choices)
mock_menu.assert_called_with("Message", **self.default_menu_options)
self.assertEqual(ret, (display_util.CANCEL, -1))
@mock.patch("letsencrypt.client.display.util.dialog.Dialog.menu")
def test_menu_desc_only(self, mock_menu):
mock_menu.return_value = (display_util.OK, "1")
ret = self.displayer.menu("Message", self.tags, help_label="More Info")
self.default_menu_options.update(
choices=self.tags_choices, help_button=True, help_label="More Info")
mock_menu.assert_called_with("Message", **self.default_menu_options)
self.assertEqual(ret, (display_util.OK, 0))
@mock.patch("letsencrypt.client.display.util.dialog.Dialog.menu")
def test_menu_desc_only_help(self, mock_menu):
mock_menu.return_value = (display_util.HELP, "2")
ret = self.displayer.menu("Message", self.tags, help_label="More Info")
self.assertEqual(ret, (display_util.HELP, 1))
@mock.patch("letsencrypt.client.display.util.dialog.Dialog.menu")
def test_menu_desc_only_cancel(self, mock_menu):
mock_menu.return_value = (display_util.CANCEL, "")
ret = self.displayer.menu("Message", self.tags, help_label="More Info")
self.assertEqual(ret, (display_util.CANCEL, -1))
@mock.patch("letsencrypt.client.display.util."
"dialog.Dialog.inputbox")
def test_input(self, mock_input):
self.displayer.input("message")
self.assertEqual(mock_input.call_count, 1)
@mock.patch("letsencrypt.client.display.util.dialog.Dialog.yesno")
def test_yesno(self, mock_yesno):
mock_yesno.return_value = display_util.OK
self.assertTrue(self.displayer.yesno("message"))
mock_yesno.assert_called_with(
"message", display_util.HEIGHT, display_util.WIDTH,
yes_label="Yes", no_label="No")
@mock.patch("letsencrypt.client.display.util."
"dialog.Dialog.checklist")
def test_checklist(self, mock_checklist):
self.displayer.checklist("message", self.tags)
choices = [
(self.tags[0], "", False),
(self.tags[1], "", False),
(self.tags[2], "", False)
]
mock_checklist.assert_called_with(
"message", width=display_util.WIDTH, height=display_util.HEIGHT,
choices=choices)
# def test_visual(self):
# visual(self.displayer, self.choices)
class FileOutputDisplayTest(DisplayT):
"""Test stdout display.
Most of this class has to deal with visual output. In order to test how the
functions look to a user, uncomment the test_visual function.
"""
def setUp(self):
super(FileOutputDisplayTest, self).setUp()
self.mock_stdout = mock.MagicMock()
self.displayer = display_util.FileDisplay(self.mock_stdout)
def test_notification_no_pause(self):
self.displayer.notification("message", 10, False)
string = self.mock_stdout.write.call_args[0][0]
self.assertTrue("message" in string)
def test_notification_pause(self):
with mock.patch("__builtin__.raw_input", return_value="enter"):
self.displayer.notification("message")
self.assertTrue("message" in self.mock_stdout.write.call_args[0][0])
@mock.patch("letsencrypt.client.display.util."
"FileDisplay._get_valid_int_ans")
def test_menu(self, mock_ans):
mock_ans.return_value = (display_util.OK, 1)
ret = self.displayer.menu("message", self.choices)
self.assertEqual(ret, (display_util.OK, 0))
def test_input_cancel(self):
with mock.patch("__builtin__.raw_input", return_value="c"):
code, _ = self.displayer.input("message")
self.assertTrue(code, display_util.CANCEL)
def test_input_normal(self):
with mock.patch("__builtin__.raw_input", return_value="domain.com"):
code, input_ = self.displayer.input("message")
self.assertEqual(code, display_util.OK)
self.assertEqual(input_, "domain.com")
def test_yesno(self):
with mock.patch("__builtin__.raw_input", return_value="Yes"):
self.assertTrue(self.displayer.yesno("message"))
with mock.patch("__builtin__.raw_input", return_value="y"):
self.assertTrue(self.displayer.yesno("message"))
with mock.patch("__builtin__.raw_input", side_effect=["maybe", "y"]):
self.assertTrue(self.displayer.yesno("message"))
with mock.patch("__builtin__.raw_input", return_value="No"):
self.assertFalse(self.displayer.yesno("message"))
with mock.patch("__builtin__.raw_input", side_effect=["cancel", "n"]):
self.assertFalse(self.displayer.yesno("message"))
with mock.patch("__builtin__.raw_input", return_value="a"):
self.assertTrue(self.displayer.yesno("msg", yes_label="Agree"))
@mock.patch("letsencrypt.client.display.util.FileDisplay.input")
def test_checklist_valid(self, mock_input):
mock_input.return_value = (display_util.OK, "2 1")
code, tag_list = self.displayer.checklist("msg", self.tags)
self.assertEqual(
(code, set(tag_list)), (display_util.OK, set(["tag1", "tag2"])))
@mock.patch("letsencrypt.client.display.util.FileDisplay.input")
def test_checklist_miss_valid(self, mock_input):
mock_input.side_effect = [
(display_util.OK, "10"),
(display_util.OK, "tag1 please"),
(display_util.OK, "1")
]
ret = self.displayer.checklist("msg", self.tags)
self.assertEqual(ret, (display_util.OK, ["tag1"]))
@mock.patch("letsencrypt.client.display.util.FileDisplay.input")
def test_checklist_miss_quit(self, mock_input):
mock_input.side_effect = [
(display_util.OK, "10"),
(display_util.CANCEL, "1")
]
ret = self.displayer.checklist("msg", self.tags)
self.assertEqual(ret, (display_util.CANCEL, []))
def test_scrub_checklist_input_valid(self):
# pylint: disable=protected-access
indices = [
["1"],
["1", "2", "1"],
["2", "3"],
]
exp = [
set(["tag1"]),
set(["tag1", "tag2"]),
set(["tag2", "tag3"]),
]
for i, list_ in enumerate(indices):
set_tags = set(
self.displayer._scrub_checklist_input(list_, self.tags))
self.assertEqual(set_tags, exp[i])
def test_scrub_checklist_input_invalid(self):
# pylint: disable=protected-access
indices = [
["0"],
["4"],
["tag1"],
["1", "tag1"],
["2", "o"]
]
for list_ in indices:
self.assertEqual(
self.displayer._scrub_checklist_input(list_, self.tags), [])
def test_print_menu(self):
# pylint: disable=protected-access
# This is purely cosmetic... just make sure there aren't any exceptions
self.displayer._print_menu("msg", self.choices)
self.displayer._print_menu("msg", self.tags)
def test_wrap_lines(self):
# pylint: disable=protected-access
msg = ("This is just a weak test{0}"
"This function is only meant to be for easy viewing{0}"
"Test a really really really really really really really really "
"really really really really long line...".format(os.linesep))
text = self.displayer._wrap_lines(msg)
self.assertEqual(text.count(os.linesep), 3)
def test_get_valid_int_ans_valid(self):
# pylint: disable=protected-access
with mock.patch("__builtin__.raw_input", return_value="1"):
self.assertEqual(
self.displayer._get_valid_int_ans(1), (display_util.OK, 1))
ans = "2"
with mock.patch("__builtin__.raw_input", return_value=ans):
self.assertEqual(
self.displayer._get_valid_int_ans(3),
(display_util.OK, int(ans)))
def test_get_valid_int_ans_invalid(self):
# pylint: disable=protected-access
answers = [
["0", "c"],
["4", "one", "C"],
["c"],
]
for ans in answers:
with mock.patch("__builtin__.raw_input", side_effect=ans):
self.assertEqual(
self.displayer._get_valid_int_ans(3),
(display_util.CANCEL, -1))
# def test_visual(self):
# self.displayer = display_util.FileDisplay(sys.stdout)
# visual(self.displayer, self.choices)
class SeparateListInputTest(unittest.TestCase):
"""Test Module functions."""
def setUp(self):
self.exp = ["a", "b", "c", "test"]
@classmethod
def _call(cls, input_):
from letsencrypt.client.display.util import separate_list_input
return separate_list_input(input_)
def test_commas(self):
self.assertEqual(self._call("a,b,c,test"), self.exp)
def test_spaces(self):
self.assertEqual(self._call("a b c test"), self.exp)
def test_both(self):
self.assertEqual(self._call("a, b, c, test"), self.exp)
def test_mess(self):
actual = [
self._call(" a , b c \t test"),
self._call(",a, ,, , b c test "),
self._call(",,,,, , a b,,, , c,test"),
]
for act in actual:
self.assertEqual(act, self.exp)
class PlaceParensTest(unittest.TestCase):
@classmethod
def _call(cls, label): # pylint: disable=protected-access
from letsencrypt.client.display.util import _parens_around_char
return _parens_around_char(label)
def test_single_letter(self):
self.assertEqual("(a)", self._call("a"))
def test_multiple(self):
self.assertEqual("(L)abel", self._call("Label"))
self.assertEqual("(y)es please", self._call("yes please"))
if __name__ == "__main__":
unittest.main()

View File

@@ -56,9 +56,10 @@ class RecoveryTokenTest(unittest.TestCase):
@mock.patch("letsencrypt.client.recovery_token.zope.component.getUtility")
def test_perform_not_stored(self, mock_input):
mock_input().generic_input.side_effect = [(0, "555"), (1, "000")]
mock_input().input.side_effect = [(0, "555"), (1, "000")]
response = self.rec_token.perform(
challenge_util.RecTokenChall("example5.com"))
self.assertEqual(response, {"type": "recoveryToken", "token": "555"})
response = self.rec_token.perform(
@@ -66,5 +67,5 @@ class RecoveryTokenTest(unittest.TestCase):
self.assertTrue(response is None)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

View File

@@ -294,10 +294,9 @@ class TestFullCheckpointsReverter(unittest.TestCase):
self.reverter.rollback_checkpoints(1)
self.assertEqual(read_in(self.config1), "directive-dir1")
@mock.patch("letsencrypt.client.reverter.logging.warning")
def test_finalize_checkpoint_no_in_progress(self, mock_warn):
self.reverter.finalize_checkpoint("No checkpoint... should warn")
self.assertEqual(mock_warn.call_count, 1)
def test_finalize_checkpoint_no_in_progress(self):
# No need to warn for this... just make sure there are no errors.
self.reverter.finalize_checkpoint("No checkpoint...")
@mock.patch("letsencrypt.client.reverter.shutil.move")
def test_finalize_checkpoint_cannot_title(self, mock_move):
@@ -331,7 +330,7 @@ class TestFullCheckpointsReverter(unittest.TestCase):
self.assertEqual(read_in(self.config2), "directive-dir2")
self.assertFalse(os.path.isfile(config3))
@mock.patch("letsencrypt.client.client.zope.component.getUtility")
@mock.patch("letsencrypt.client.reverter.zope.component.getUtility")
def test_view_config_changes(self, mock_output):
"""This is not strict as this is subject to change."""
self._setup_three_checkpoints()
@@ -340,7 +339,7 @@ class TestFullCheckpointsReverter(unittest.TestCase):
self.reverter.view_config_changes()
# Make sure notification is output
self.assertEqual(mock_output().generic_notification.call_count, 1)
self.assertEqual(mock_output().notification.call_count, 1)
@mock.patch("letsencrypt.client.reverter.logging")
def test_view_config_changes_no_backups(self, mock_logging):

View File

@@ -0,0 +1,408 @@
"""Test letsencrypt.client.revoker."""
import csv
import os
import pkg_resources
import shutil
import tempfile
import unittest
import mock
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client.apache import configurator
from letsencrypt.client.display import util as display_util
class RevokerBase(unittest.TestCase): # pylint: disable=too-few-public-methods
"""Base Class for Revoker Tests."""
def setUp(self):
self.paths, self.certs, self.key_path = create_revoker_certs()
self.backup_dir = tempfile.mkdtemp("cert_backup")
self.mock_config = mock.MagicMock(cert_key_backup=self.backup_dir)
self.list_path = os.path.join(self.backup_dir, "LIST")
def _store_certs(self):
# pylint: disable=protected-access
from letsencrypt.client.revoker import Revoker
Revoker.store_cert_key(self.paths[0], self.key_path, self.mock_config)
Revoker.store_cert_key(self.paths[1], self.key_path, self.mock_config)
# Set metadata
for i in xrange(2):
self.certs[i].add_meta(
i, self.paths[i], self.key_path,
Revoker._get_backup(self.backup_dir, i, self.paths[i]),
Revoker._get_backup(self.backup_dir, i, self.key_path))
def _get_rows(self):
with open(self.list_path, "rb") as csvfile:
return [row for row in csv.reader(csvfile)]
def _write_rows(self, rows):
with open(self.list_path, "wb") as csvfile:
csvwriter = csv.writer(csvfile)
for row in rows:
csvwriter.writerow(row)
class RevokerTest(RevokerBase):
def setUp(self):
from letsencrypt.client.revoker import Revoker
super(RevokerTest, self).setUp()
with open(self.key_path) as key_file:
self.key = le_util.Key(self.key_path, key_file.read())
self._store_certs()
self.revoker = Revoker(
mock.MagicMock(spec=configurator.ApacheConfigurator),
self.mock_config)
def tearDown(self):
shutil.rmtree(self.backup_dir)
@mock.patch("letsencrypt.client.revoker.network."
"Network.send_and_receive_expected")
@mock.patch("letsencrypt.client.revoker.revocation")
def test_revoke_by_key_all(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
self.revoker.revoke_from_key(self.key)
self.assertEqual(self._get_rows(), [])
# Check to make sure backups were eliminated
for i in xrange(2):
self.assertFalse(self._backups_exist(self.certs[i].get_row()))
self.assertEqual(mock_net.call_count, 2)
@mock.patch("letsencrypt.client.revoker.Crypto.PublicKey.RSA.importKey")
def test_revoke_by_invalid_keys(self, mock_import):
mock_import.side_effect = ValueError
self.assertRaises(errors.LetsEncryptRevokerError,
self.revoker.revoke_from_key,
self.key)
mock_import.side_effect = [mock.Mock(), IndexError]
self.assertRaises(errors.LetsEncryptRevokerError,
self.revoker.revoke_from_key,
self.key)
@mock.patch("letsencrypt.client.revoker.network."
"Network.send_and_receive_expected")
@mock.patch("letsencrypt.client.revoker.revocation")
def test_revoke_by_wrong_key(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
key_path = pkg_resources.resource_filename(
"letsencrypt.client.tests", os.path.join(
"testdata", "rsa256_key.pem"))
wrong_key = le_util.Key(key_path, open(key_path).read())
self.revoker.revoke_from_key(wrong_key)
# Nothing was removed
self.assertEqual(len(self._get_rows()), 2)
# No revocation went through
self.assertEqual(mock_net.call_count, 0)
@mock.patch("letsencrypt.client.revoker.network."
"Network.send_and_receive_expected")
@mock.patch("letsencrypt.client.revoker.revocation")
def test_revoke_by_cert(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
self.revoker.revoke_from_cert(self.paths[1])
row0 = self.certs[0].get_row()
row1 = self.certs[1].get_row()
self.assertEqual(self._get_rows(), [row0])
self.assertTrue(self._backups_exist(row0))
self.assertFalse(self._backups_exist(row1))
self.assertEqual(mock_net.call_count, 1)
@mock.patch("letsencrypt.client.revoker.network."
"Network.send_and_receive_expected")
@mock.patch("letsencrypt.client.revoker.revocation")
def test_revoke_by_cert_not_found(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
self.revoker.revoke_from_cert(self.paths[0])
self.revoker.revoke_from_cert(self.paths[0])
row0 = self.certs[0].get_row()
row1 = self.certs[1].get_row()
# Same check as last time... just reversed.
self.assertEqual(self._get_rows(), [row1])
self.assertTrue(self._backups_exist(row1))
self.assertFalse(self._backups_exist(row0))
self.assertEqual(mock_net.call_count, 1)
@mock.patch("letsencrypt.client.revoker.network."
"Network.send_and_receive_expected")
@mock.patch("letsencrypt.client.revoker.revocation")
def test_revoke_by_menu(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
mock_display.display_certs.side_effect = [
(display_util.HELP, 0),
(display_util.OK, 0),
(display_util.CANCEL, -1),
]
self.revoker.revoke_from_menu()
row0 = self.certs[0].get_row()
row1 = self.certs[1].get_row()
self.assertEqual(self._get_rows(), [row1])
self.assertFalse(self._backups_exist(row0))
self.assertTrue(self._backups_exist(row1))
self.assertEqual(mock_net.call_count, 1)
self.assertEqual(mock_display.more_info_cert.call_count, 1)
@mock.patch("letsencrypt.client.revoker.logging")
@mock.patch("letsencrypt.client.revoker.network."
"Network.send_and_receive_expected")
@mock.patch("letsencrypt.client.revoker.revocation")
def test_revoke_by_menu_delete_all(self, mock_display, mock_net, mock_log):
mock_display().confirm_revocation.return_value = True
mock_display.display_certs.return_value = (display_util.OK, 0)
self.revoker.revoke_from_menu()
self.assertEqual(self._get_rows(), [])
# Everything should be deleted...
for i in xrange(2):
self.assertFalse(self._backups_exist(self.certs[i].get_row()))
self.assertEqual(mock_net.call_count, 2)
# Info is called when there aren't any certs left...
self.assertTrue(mock_log.info.called)
@mock.patch("letsencrypt.client.revoker.revocation")
@mock.patch("letsencrypt.client.revoker.Revoker._acme_revoke")
@mock.patch("letsencrypt.client.revoker.logging")
def test_safe_revoke_acme_fail(self, mock_log, mock_revoke, mock_display):
# pylint: disable=protected-access
mock_revoke.side_effect = errors.LetsEncryptClientError
mock_display().confirm_revocation.return_value = True
self.revoker._safe_revoke(self.certs)
self.assertTrue(mock_log.error.called)
@mock.patch("letsencrypt.client.revoker.Crypto.PublicKey.RSA.importKey")
def test_acme_revoke_failure(self, mock_crypto):
# pylint: disable=protected-access
mock_crypto.side_effect = ValueError
self.assertRaises(errors.LetsEncryptClientError,
self.revoker._acme_revoke,
self.certs[0])
def test_remove_certs_from_list_bad_certs(self):
# pylint: disable=protected-access
from letsencrypt.client.revoker import Cert
new_cert = Cert(self.paths[0])
# This isn't stored in the db
new_cert.idx = 10
new_cert.backup_path = self.paths[0]
new_cert.backup_key_path = self.key_path
new_cert.orig = Cert.PathStatus("false path", "not here")
new_cert.orig_key = Cert.PathStatus("false path", "not here")
self.assertRaises(errors.LetsEncryptRevokerError,
self.revoker._remove_certs_from_list,
[new_cert])
def _backups_exist(self, row):
# pylint: disable=protected-access
cert_path, key_path = self.revoker._row_to_backup(row)
return os.path.isfile(cert_path) and os.path.isfile(key_path)
class RevokerInstallerTest(RevokerBase):
def setUp(self):
super(RevokerInstallerTest, self).setUp()
self.installs = [
["installation/path0a", "installation/path0b"],
["installation/path1"],
]
self.certs_keys = [
(self.paths[0], self.key_path, self.installs[0][0]),
(self.paths[0], self.key_path, self.installs[0][1]),
(self.paths[1], self.key_path, self.installs[1][0]),
]
self._store_certs()
def _get_revoker(self, installer):
from letsencrypt.client.revoker import Revoker
return Revoker(installer, self.mock_config)
def test_no_installer_get_installed_locations(self):
# pylint: disable=protected-access
revoker = self._get_revoker(None)
self.assertEqual(revoker._get_installed_locations(), {})
def test_get_installed_locations(self):
# pylint: disable=protected-access
mock_installer = mock.MagicMock()
mock_installer.get_all_certs_keys.return_value = self.certs_keys
revoker = self._get_revoker(mock_installer)
sha_vh = revoker._get_installed_locations()
self.assertEqual(len(sha_vh), 2)
for i, cert in enumerate(self.certs):
self.assertTrue(cert.get_fingerprint() in sha_vh)
self.assertEqual(
sha_vh[cert.get_fingerprint()], self.installs[i])
@mock.patch("letsencrypt.client.revoker.M2Crypto.X509.load_cert")
def test_get_installed_load_failure(self, mock_m2):
mock_installer = mock.MagicMock()
mock_installer.get_all_certs_keys.return_value = self.certs_keys
mock_m2.side_effect = IOError
revoker = self._get_revoker(mock_installer)
# pylint: disable=protected-access
self.assertEqual(revoker._get_installed_locations(), {})
class RevokerClassMethodsTest(RevokerBase):
def setUp(self):
super(RevokerClassMethodsTest, self).setUp()
self.mock_config = mock.MagicMock(cert_key_backup=self.backup_dir)
def tearDown(self):
shutil.rmtree(self.backup_dir)
def _call(self, cert_path, key_path):
from letsencrypt.client.revoker import Revoker
Revoker.store_cert_key(cert_path, key_path, self.mock_config)
def test_store_two(self):
from letsencrypt.client.revoker import Revoker
self._call(self.paths[0], self.key_path)
self._call(self.paths[1], self.key_path)
self.assertTrue(os.path.isfile(self.list_path))
rows = self._get_rows()
for i, row in enumerate(rows):
# pylint: disable=protected-access
self.assertTrue(os.path.isfile(
Revoker._get_backup(self.backup_dir, i, self.paths[i])))
self.assertTrue(os.path.isfile(
Revoker._get_backup(self.backup_dir, i, self.key_path)))
self.assertEqual([str(i), self.paths[i], self.key_path], row)
self.assertEqual(len(rows), 2)
def test_store_one_mixed(self):
from letsencrypt.client.revoker import Revoker
self._write_rows(
[["5", "blank", "blank"], ["18", "dc", "dc"], ["21", "b", "b"]])
self._call(self.paths[0], self.key_path)
self.assertEqual(
self._get_rows()[3], ["22", self.paths[0], self.key_path])
# pylint: disable=protected-access
self.assertTrue(os.path.isfile(
Revoker._get_backup(self.backup_dir, 22, self.paths[0])))
self.assertTrue(os.path.isfile(
Revoker._get_backup(self.backup_dir, 22, self.key_path)))
class CertTest(unittest.TestCase):
def setUp(self):
self.paths, self.certs, self.key_path = create_revoker_certs()
def test_failed_load(self):
from letsencrypt.client.revoker import Cert
self.assertRaises(errors.LetsEncryptRevokerError, Cert, self.key_path)
def test_no_row(self):
self.assertEqual(self.certs[0].get_row(), None)
def test_meta_moved_files(self):
from letsencrypt.client.revoker import Cert
fake_path = "/not/a/real/path/r72d3t6"
self.certs[0].add_meta(
0, fake_path, fake_path, self.paths[0], self.key_path)
self.assertEqual(self.certs[0].orig.status, Cert.DELETED_MSG)
self.assertEqual(self.certs[0].orig_key.status, Cert.DELETED_MSG)
def test_meta_changed_files(self):
from letsencrypt.client.revoker import Cert
self.certs[0].add_meta(
0, self.paths[1], self.paths[1], self.paths[0], self.key_path)
self.assertEqual(self.certs[0].orig.status, Cert.CHANGED_MSG)
self.assertEqual(self.certs[0].orig_key.status, Cert.CHANGED_MSG)
def test_meta_no_status(self):
self.certs[0].add_meta(
0, self.paths[0], self.key_path, self.paths[0], self.key_path)
self.assertEqual(self.certs[0].orig.status, "")
self.assertEqual(self.certs[0].orig_key.status, "")
def test_print_meta(self):
"""Just make sure there aren't any major errors."""
self.certs[0].add_meta(
0, self.paths[0], self.key_path, self.paths[0], self.key_path)
# Changed path and deleted file
self.certs[1].add_meta(
1, self.paths[0], "/not/a/path", self.paths[1], self.key_path)
self.assertTrue(self.certs[0].pretty_print())
self.assertTrue(self.certs[1].pretty_print())
def test_print_no_meta(self):
self.assertTrue(self.certs[0].pretty_print())
self.assertTrue(self.certs[1].pretty_print())
def create_revoker_certs():
"""Create a few revoker.Cert objects."""
from letsencrypt.client.revoker import Cert
base_package = "letsencrypt.client.tests"
cert0_path = pkg_resources.resource_filename(
base_package, os.path.join("testdata", "cert.pem"))
cert1_path = pkg_resources.resource_filename(
base_package, os.path.join("testdata", "cert-san.pem"))
cert0 = Cert(cert0_path)
cert1 = Cert(cert1_path)
key_path = pkg_resources.resource_filename(
base_package, os.path.join("testdata", "rsa512_key.pem"))
return [cert0_path, cert1_path], [cert0, cert1], key_path
if __name__ == "__main__":
unittest.main()

View File

@@ -573,5 +573,33 @@ class CleanupTest(unittest.TestCase):
self.assertRaises(ValueError, self.authenticator.cleanup, [chall])
class MoreInfoTest(unittest.TestCase):
"""Tests for more_info() method. (trivially)"""
def setUp(self):
from letsencrypt.client.standalone_authenticator import (
StandaloneAuthenticator)
self.authenticator = StandaloneAuthenticator()
def test_more_info(self):
"""Make sure exceptions aren't raised."""
self.authenticator.more_info()
class InitTest(unittest.TestCase):
"""Tests for more_info() method. (trivially)"""
def setUp(self):
from letsencrypt.client.standalone_authenticator import (
StandaloneAuthenticator)
self.authenticator = StandaloneAuthenticator()
def test_prepare(self):
"""Make sure exceptions aren't raised.
.. todo:: Add on more once things are setup appropriately.
"""
self.authenticator.prepare()
if __name__ == "__main__":
unittest.main()

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python
"""Parse command line and call the appropriate functions.
..todo:: Sanity check all input. Be sure to avoid shell code etc...
.. todo:: Sanity check all input. Be sure to avoid shell code etc...
"""
import argparse
@@ -16,11 +16,15 @@ import letsencrypt
from letsencrypt.client import configuration
from letsencrypt.client import client
from letsencrypt.client import display
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import le_util
from letsencrypt.client import log
from letsencrypt.client import standalone_authenticator as standalone
from letsencrypt.client.apache import configurator
from letsencrypt.client.display import util as display_util
from letsencrypt.client.display import ops as display_ops
def create_parser():
"""Create parser."""
@@ -34,12 +38,18 @@ def create_parser():
add("-s", "--server", default="letsencrypt-demo.org:443",
help=config_help("server"))
add("-p", "--privkey", type=read_file,
help="Path to the private key file for certificate generation.")
add("-k", "--authkey", type=read_file,
help="Path to the authorized key file")
add("-B", "--rsa-key-size", type=int, default=2048, metavar="N",
help=config_help("rsa_key_size"))
add("-k", "--revoke", action="store_true", help="Revoke a certificate.")
add("-R", "--revoke", action="store_true",
help="Revoke a certificate from a menu.")
add("--revoke-certificate", dest="rev_cert", type=read_file,
help="Revoke a specific certificate.")
add("--revoke-key", dest="rev_key", type=read_file,
help="Revoke all certs generated by the provided authorized key.")
add("-b", "--rollback", type=int, default=0, metavar="N",
help="Revert configuration N number of checkpoints.")
add("-v", "--view-config-changes", action="store_true",
@@ -50,6 +60,9 @@ def create_parser():
help="Automatically redirect all HTTP traffic to HTTPS for the newly "
"authenticated vhost.")
add("--no-confirm", dest="no_confirm", action="store_true",
help="Turn off confirmation screens, currently used for --revoke")
add("-e", "--agree-tos", dest="eula", action="store_true",
help="Skip the end user license agreement screen.")
add("-t", "--text", dest="use_curses", action="store_false",
@@ -85,7 +98,7 @@ def create_parser():
return parser
def main(): # pylint: disable=too-many-branches
def main(): # pylint: disable=too-many-branches, too-many-statements
"""Command line argument parsing and main script execution."""
# note: arg parser internally handles --help (and exits afterwards)
args = create_parser().parse_args()
@@ -102,17 +115,18 @@ def main(): # pylint: disable=too-many-branches
logger.setLevel(logging.INFO)
if args.use_curses:
logger.addHandler(log.DialogHandler())
displayer = display.NcursesDisplay()
displayer = display_util.NcursesDisplay()
else:
displayer = display.FileDisplay(sys.stdout)
displayer = display_util.FileDisplay(sys.stdout)
zope.component.provideUtility(displayer)
if args.view_config_changes:
client.view_config_changes(config)
sys.exit()
if args.revoke:
client.revoke(config)
if args.revoke or args.rev_cert is not None or args.rev_key is not None:
client.revoke(config, args.no_confirm, args.rev_cert, args.rev_key)
sys.exit()
if args.rollback > 0:
@@ -122,91 +136,67 @@ def main(): # pylint: disable=too-many-branches
if not args.eula:
display_eula()
# Make sure we actually get an installer that is functioning properly
# before we begin to try to use it.
all_auths = [
configurator.ApacheConfigurator(config),
standalone.StandaloneAuthenticator(),
]
try:
installer = client.determine_installer(config)
except errors.LetsEncryptMisconfigurationError as err:
logging.fatal("Please fix your configuration before proceeding. "
"The Installer exited with the following message: "
"%s", err)
auth = client.determine_authenticator(all_auths)
except errors.LetsEncryptClientError:
logging.critical("No authentication mechanisms were found on your "
"system.")
sys.exit(1)
# Use the same object if possible
if interfaces.IAuthenticator.providedBy(installer): # pylint: disable=no-member
auth = installer
else:
auth = client.determine_authenticator(config)
if auth is None:
sys.exit(0)
doms = choose_names(installer) if args.domains is None else args.domains
# Use the same object if possible
if interfaces.IInstaller.providedBy(auth): # pylint: disable=no-member
installer = auth
else:
# This is simple and avoids confusion right now.
installer = None
if args.domains is None:
doms = display_ops.choose_names(installer)
else:
doms = args.domains
if not doms:
sys.exit(0)
# Prepare for init of Client
if args.privkey is None:
privkey = client.init_key(args.rsa_key_size, config.key_dir)
if args.authkey is None:
authkey = client.init_key(args.rsa_key_size, config.key_dir)
else:
privkey = le_util.Key(args.privkey[0], args.privkey[1])
authkey = le_util.Key(args.authkey[0], args.authkey[1])
acme = client.Client(config, privkey, auth, installer)
acme = client.Client(config, authkey, auth, installer)
# Validate the key and csr
client.validate_key_csr(privkey)
client.validate_key_csr(authkey)
# This more closely mimics the capabilities of the CLI
# It should be possible for reconfig only, install-only, no-install
# I am not sure the best way to handle all of the unimplemented abilities,
# but this code should be safe on all environments.
cert_file = None
if auth is not None:
cert_file, chain_file = acme.obtain_certificate(doms)
if installer is not None and cert_file is not None:
acme.deploy_certificate(doms, privkey, cert_file, chain_file)
acme.deploy_certificate(doms, authkey, cert_file, chain_file)
if installer is not None:
acme.enhance_config(doms, args.redirect)
def display_eula():
"""Displays the end user agreement."""
with open('EULA') as eula_file:
if not zope.component.getUtility(interfaces.IDisplay).generic_yesno(
with open("EULA") as eula_file:
if not zope.component.getUtility(interfaces.IDisplay).yesno(
eula_file.read(), "Agree", "Cancel"):
sys.exit(0)
def choose_names(installer):
"""Display screen to select domains to validate.
:param installer: An installer object
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
"""
# This function adds all names found in the installer configuration
# Then filters them based on user selection
code, names = zope.component.getUtility(
interfaces.IDisplay).filter_names(get_all_names(installer))
if code == display.OK and names:
return names
else:
sys.exit(0)
def get_all_names(installer):
"""Return all valid names in the configuration.
:param installer: An installer object
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
"""
names = list(installer.get_all_names())
if not names:
logging.fatal("No domain names were found in your installation")
logging.fatal("Either specify which names you would like "
"letsencrypt to validate or add server names "
"to your virtual hosts")
sys.exit(1)
return names
def read_file(filename):
"""Returns the given file's contents with universal new line support.
@@ -219,7 +209,7 @@ def read_file(filename):
"""
try:
return filename, open(filename, 'rU').read()
return filename, open(filename, "rU").read()
except IOError as exc:
raise argparse.ArgumentTypeError(exc.strerror)

View File

@@ -0,0 +1,10 @@
# readthedocs.org gives no way to change the install command to "pip
# install -e .[docs]" (that would in turn install documentation
# dependencies), but it allows to specify a requirements.txt file at
# https://readthedocs.org/dashboard/letsencrypt/advanced/ (c.f. #259)
# Although ReadTheDocs certainly doesn't need to install the project
# in --editable mode (-e), just "pip install .[docs]" does not work as
# expected and "pip install -e .[docs]" must be used instead
-e .[docs]

View File

@@ -1,8 +0,0 @@
M2Crypto==0.22.3
python2-pythondialog
jsonschema==2.4.0
python-augeas==0.5.0
requests==2.4.3
argparse==1.2.2
mock==1.0.1
PyOpenSSL==0.13

View File

@@ -2,7 +2,7 @@
zip_ok = false
[aliases]
dev = develop easy_install letsencrypt[testing,dev]
dev = develop easy_install letsencrypt[dev,docs,testing]
[nosetests]
nocapture=1

View File

@@ -41,8 +41,12 @@ install_requires = [
dev_extras = [
'pylint>=1.4.0', # upstream #248
]
docs_extras = [
'repoze.sphinx.autointerface',
'Sphinx',
'sphinx_rtd_theme',
]
testing_extras = [
@@ -65,8 +69,10 @@ setup(
'letsencrypt.acme',
'letsencrypt.client',
'letsencrypt.client.apache',
'letsencrypt.client.display',
'letsencrypt.client.tests',
'letsencrypt.client.tests.apache',
'letsencrypt.client.tests.display',
'letsencrypt.scripts',
],
install_requires=install_requires,
@@ -74,6 +80,7 @@ setup(
test_suite='letsencrypt',
extras_require={
'dev': dev_extras,
'docs': docs_extras,
'testing': testing_extras,
},
entry_points={

View File

@@ -17,7 +17,7 @@ setenv =
basepython = python2.7
commands =
pip install -e .[testing]
python setup.py nosetests --with-coverage --cover-min-percentage=73
python setup.py nosetests --with-coverage --cover-min-percentage=83
[testenv:lint]
# recent versions of pylint do not support Python 2.6 (#97, #187)