1
0
mirror of https://github.com/certbot/certbot.git synced 2025-07-30 09:03:08 +03:00

Added certbot-ci to lint section. Silenced and fixed linting warnings. (#8450)

This commit is contained in:
Mads Jensen
2020-12-16 20:34:12 +01:00
committed by GitHub
parent d38766e05c
commit 96a05d946c
19 changed files with 206 additions and 125 deletions

View File

@ -186,6 +186,7 @@ def probe_sni(name, host, port=443, timeout=300, # pylint: disable=too-many-argu
raise errors.Error(error) raise errors.Error(error)
return client_ssl.get_peer_certificate() return client_ssl.get_peer_certificate()
def make_csr(private_key_pem, domains, must_staple=False): def make_csr(private_key_pem, domains, must_staple=False):
"""Generate a CSR containing a list of domains as subjectAltNames. """Generate a CSR containing a list of domains as subjectAltNames.
@ -217,6 +218,7 @@ def make_csr(private_key_pem, domains, must_staple=False):
return crypto.dump_certificate_request( return crypto.dump_certificate_request(
crypto.FILETYPE_PEM, csr) crypto.FILETYPE_PEM, csr)
def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req): def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req):
common_name = loaded_cert_or_req.get_subject().CN common_name = loaded_cert_or_req.get_subject().CN
sans = _pyopenssl_cert_or_req_san(loaded_cert_or_req) sans = _pyopenssl_cert_or_req_san(loaded_cert_or_req)
@ -225,6 +227,7 @@ def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req):
return sans return sans
return [common_name] + [d for d in sans if d != common_name] return [common_name] + [d for d in sans if d != common_name]
def _pyopenssl_cert_or_req_san(cert_or_req): def _pyopenssl_cert_or_req_san(cert_or_req):
"""Get Subject Alternative Names from certificate or CSR using pyOpenSSL. """Get Subject Alternative Names from certificate or CSR using pyOpenSSL.
@ -317,6 +320,7 @@ def gen_ss_cert(key, domains, not_before=None,
cert.sign(key, "sha256") cert.sign(key, "sha256")
return cert return cert
def dump_pyopenssl_chain(chain, filetype=crypto.FILETYPE_PEM): def dump_pyopenssl_chain(chain, filetype=crypto.FILETYPE_PEM):
"""Dump certificate chain into a bundle. """Dump certificate chain into a bundle.

View File

@ -1,3 +1,4 @@
# pylint: disable=missing-module-docstring
import pytest import pytest
# Custom assertions defined in the following package need to be registered to be properly # Custom assertions defined in the following package need to be registered to be properly

View File

@ -77,6 +77,6 @@ class IntegrationTestsContext(object):
appending the pytest worker id to the subdomain, using this pattern: appending the pytest worker id to the subdomain, using this pattern:
{subdomain}.{worker_id}.wtf {subdomain}.{worker_id}.wtf
:param subdomain: the subdomain to use in the generated domain (default 'le') :param subdomain: the subdomain to use in the generated domain (default 'le')
:return: the well-formed domain suitable for redirection on :return: the well-formed domain suitable for redirection on
""" """
return '{0}.{1}.wtf'.format(subdomain, self.worker_id) return '{0}.{1}.wtf'.format(subdomain, self.worker_id)

View File

@ -29,8 +29,9 @@ from certbot_integration_tests.certbot_tests.assertions import EVERYBODY_SID
from certbot_integration_tests.utils import misc from certbot_integration_tests.utils import misc
@pytest.fixture() @pytest.fixture(name='context')
def context(request): def test_context(request):
# pylint: disable=missing-function-docstring
# Fixture request is a built-in pytest fixture describing current test request. # Fixture request is a built-in pytest fixture describing current test request.
integration_test_context = certbot_context.IntegrationTestsContext(request) integration_test_context = certbot_context.IntegrationTestsContext(request)
try: try:
@ -222,14 +223,16 @@ def test_renew_files_propagate_permissions(context):
if os.name != 'nt': if os.name != 'nt':
os.chmod(privkey1, 0o444) os.chmod(privkey1, 0o444)
else: else:
import win32security import win32security # pylint: disable=import-error
import ntsecuritycon import ntsecuritycon # pylint: disable=import-error
# Get the current DACL of the private key # Get the current DACL of the private key
security = win32security.GetFileSecurity(privkey1, win32security.DACL_SECURITY_INFORMATION) security = win32security.GetFileSecurity(privkey1, win32security.DACL_SECURITY_INFORMATION)
dacl = security.GetSecurityDescriptorDacl() dacl = security.GetSecurityDescriptorDacl()
# Create a read permission for Everybody group # Create a read permission for Everybody group
everybody = win32security.ConvertStringSidToSid(EVERYBODY_SID) everybody = win32security.ConvertStringSidToSid(EVERYBODY_SID)
dacl.AddAccessAllowedAce(win32security.ACL_REVISION, ntsecuritycon.FILE_GENERIC_READ, everybody) dacl.AddAccessAllowedAce(
win32security.ACL_REVISION, ntsecuritycon.FILE_GENERIC_READ, everybody
)
# Apply the updated DACL to the private key # Apply the updated DACL to the private key
security.SetSecurityDescriptorDacl(1, dacl, 0) security.SetSecurityDescriptorDacl(1, dacl, 0)
win32security.SetFileSecurity(privkey1, win32security.DACL_SECURITY_INFORMATION, security) win32security.SetFileSecurity(privkey1, win32security.DACL_SECURITY_INFORMATION, security)
@ -238,12 +241,14 @@ def test_renew_files_propagate_permissions(context):
assert_cert_count_for_lineage(context.config_dir, certname, 2) assert_cert_count_for_lineage(context.config_dir, certname, 2)
if os.name != 'nt': if os.name != 'nt':
# On Linux, read world permissions + all group permissions will be copied from the previous private key # On Linux, read world permissions + all group permissions
# will be copied from the previous private key
assert_world_read_permissions(privkey2) assert_world_read_permissions(privkey2)
assert_equals_world_read_permissions(privkey1, privkey2) assert_equals_world_read_permissions(privkey1, privkey2)
assert_equals_group_permissions(privkey1, privkey2) assert_equals_group_permissions(privkey1, privkey2)
else: else:
# On Windows, world will never have any permissions, and group permission is irrelevant for this platform # On Windows, world will never have any permissions, and
# group permission is irrelevant for this platform
assert_world_no_permissions(privkey2) assert_world_no_permissions(privkey2)
@ -609,14 +614,17 @@ def test_revoke_multiple_lineages(context):
with open(join(context.config_dir, 'renewal', '{0}.conf'.format(cert2)), 'r') as file: with open(join(context.config_dir, 'renewal', '{0}.conf'.format(cert2)), 'r') as file:
data = file.read() data = file.read()
data = re.sub('archive_dir = .*\n', data = re.sub(
'archive_dir = {0}\n'.format(join(context.config_dir, 'archive', cert1).replace('\\', '\\\\')), 'archive_dir = .*\n',
data) 'archive_dir = {0}\n'.format(
join(context.config_dir, 'archive', cert1).replace('\\', '\\\\')
), data
)
with open(join(context.config_dir, 'renewal', '{0}.conf'.format(cert2)), 'w') as file: with open(join(context.config_dir, 'renewal', '{0}.conf'.format(cert2)), 'w') as file:
file.write(data) file.write(data)
output = context.certbot([ context.certbot([
'revoke', '--cert-path', join(context.config_dir, 'live', cert1, 'cert.pem') 'revoke', '--cert-path', join(context.config_dir, 'live', cert1, 'cert.pem')
]) ])

View File

@ -13,7 +13,6 @@ import sys
from certbot_integration_tests.utils import acme_server as acme_lib from certbot_integration_tests.utils import acme_server as acme_lib
from certbot_integration_tests.utils import dns_server as dns_lib from certbot_integration_tests.utils import dns_server as dns_lib
from certbot_integration_tests.utils.dns_server import DNSServer
def pytest_addoption(parser): def pytest_addoption(parser):
@ -92,8 +91,10 @@ def _setup_primary_node(config):
try: try:
subprocess.check_output(['docker-compose', '-v'], stderr=subprocess.STDOUT) subprocess.check_output(['docker-compose', '-v'], stderr=subprocess.STDOUT)
except (subprocess.CalledProcessError, OSError): except (subprocess.CalledProcessError, OSError):
raise ValueError('Error: docker-compose is required in PATH to launch the integration tests, ' raise ValueError(
'but is not installed or not available for current user.') 'Error: docker-compose is required in PATH to launch the integration tests, '
'but is not installed or not available for current user.'
)
# Parameter numprocesses is added to option by pytest-xdist # Parameter numprocesses is added to option by pytest-xdist
workers = ['primary'] if not config.option.numprocesses\ workers = ['primary'] if not config.option.numprocesses\

View File

@ -1,3 +1,4 @@
"""Module to handle the context of nginx integration tests."""
import os import os
import subprocess import subprocess

View File

@ -7,8 +7,8 @@ import pytest
from certbot_integration_tests.nginx_tests import context as nginx_context from certbot_integration_tests.nginx_tests import context as nginx_context
@pytest.fixture() @pytest.fixture(name='context')
def context(request): def test_context(request):
# Fixture request is a built-in pytest fixture describing current test request. # Fixture request is a built-in pytest fixture describing current test request.
integration_test_context = nginx_context.IntegrationTestsContext(request) integration_test_context = nginx_context.IntegrationTestsContext(request)
try: try:
@ -27,7 +27,9 @@ def context(request):
# No matching server block; default_server does not exist # No matching server block; default_server does not exist
('nginx5.{0}.wtf', ['--preferred-challenges', 'http'], {'default_server': False}), ('nginx5.{0}.wtf', ['--preferred-challenges', 'http'], {'default_server': False}),
# Multiple domains, mix of matching and not # Multiple domains, mix of matching and not
('nginx6.{0}.wtf,nginx7.{0}.wtf', ['--preferred-challenges', 'http'], {'default_server': False}), ('nginx6.{0}.wtf,nginx7.{0}.wtf', [
'--preferred-challenges', 'http'
], {'default_server': False}),
], indirect=['context']) ], indirect=['context'])
def test_certificate_deployment(certname_pattern, params, context): def test_certificate_deployment(certname_pattern, params, context):
# type: (str, list, nginx_context.IntegrationTestsContext) -> None # type: (str, list, nginx_context.IntegrationTestsContext) -> None
@ -41,7 +43,9 @@ def test_certificate_deployment(certname_pattern, params, context):
lineage = domains.split(',')[0] lineage = domains.split(',')[0]
server_cert = ssl.get_server_certificate(('localhost', context.tls_alpn_01_port)) server_cert = ssl.get_server_certificate(('localhost', context.tls_alpn_01_port))
with open(os.path.join(context.workspace, 'conf/live/{0}/cert.pem'.format(lineage)), 'r') as file: with open(os.path.join(
context.workspace, 'conf/live/{0}/cert.pem'.format(lineage)), 'r'
) as file:
certbot_cert = file.read() certbot_cert = file.read()
assert server_cert == certbot_cert assert server_cert == certbot_cert

View File

@ -1,7 +1,10 @@
from contextlib import contextmanager """Module to handle the context of RFC2136 integration tests."""
from pytest import skip
from pkg_resources import resource_filename
import tempfile import tempfile
from contextlib import contextmanager
from pkg_resources import resource_filename
from pytest import skip
from certbot_integration_tests.certbot_tests import context as certbot_context from certbot_integration_tests.certbot_tests import context as certbot_context
from certbot_integration_tests.utils import certbot_call from certbot_integration_tests.utils import certbot_call
@ -33,7 +36,6 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext):
@contextmanager @contextmanager
def rfc2136_credentials(self, label='default'): def rfc2136_credentials(self, label='default'):
# type: (str) -> str
""" """
Produces the contents of a certbot-dns-rfc2136 credentials file. Produces the contents of a certbot-dns-rfc2136 credentials file.
:param str label: which RFC2136 credential to use :param str label: which RFC2136 credential to use
@ -52,10 +54,10 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext):
) )
with tempfile.NamedTemporaryFile('w+', prefix='rfc2136-creds-{}'.format(label), with tempfile.NamedTemporaryFile('w+', prefix='rfc2136-creds-{}'.format(label),
suffix='.ini', dir=self.workspace) as f: suffix='.ini', dir=self.workspace) as fp:
f.write(contents) fp.write(contents)
f.flush() fp.flush()
yield f.name yield fp.name
def skip_if_no_bind9_server(self): def skip_if_no_bind9_server(self):
"""Skips the test if there was no RFC2136-capable DNS server configured """Skips the test if there was no RFC2136-capable DNS server configured

View File

@ -4,8 +4,9 @@ import pytest
from certbot_integration_tests.rfc2136_tests import context as rfc2136_context from certbot_integration_tests.rfc2136_tests import context as rfc2136_context
@pytest.fixture() @pytest.fixture(name="context")
def context(request): def pytest_context(request):
# pylint: disable=missing-function-docstring
# Fixture request is a built-in pytest fixture describing current test request. # Fixture request is a built-in pytest fixture describing current test request.
integration_test_context = rfc2136_context.IntegrationTestsContext(request) integration_test_context = rfc2136_context.IntegrationTestsContext(request)
try: try:

View File

@ -7,7 +7,6 @@ import errno
import json import json
import os import os
from os.path import join from os.path import join
import re
import shutil import shutil
import subprocess import subprocess
import sys import sys
@ -16,9 +15,11 @@ import time
import requests import requests
from acme.magic_typing import List
from certbot_integration_tests.utils import misc from certbot_integration_tests.utils import misc
from certbot_integration_tests.utils import pebble_artifacts from certbot_integration_tests.utils import pebble_artifacts
from certbot_integration_tests.utils import proxy from certbot_integration_tests.utils import proxy
# pylint: disable=wildcard-import,unused-wildcard-import
from certbot_integration_tests.utils.constants import * from certbot_integration_tests.utils.constants import *
@ -31,8 +32,8 @@ class ACMEServer(object):
ACMEServer gives access the acme_xdist parameter, listing the ports and directory url to use ACMEServer gives access the acme_xdist parameter, listing the ports and directory url to use
for each pytest node. It exposes also start and stop methods in order to start the stack, and for each pytest node. It exposes also start and stop methods in order to start the stack, and
stop it with proper resources cleanup. stop it with proper resources cleanup.
ACMEServer is also a context manager, and so can be used to ensure ACME server is started/stopped ACMEServer is also a context manager, and so can be used to ensure ACME server is
upon context enter/exit. started/stopped upon context enter/exit.
""" """
def __init__(self, acme_server, nodes, http_proxy=True, stdout=False, dns_server=None): def __init__(self, acme_server, nodes, http_proxy=True, stdout=False, dns_server=None):
""" """
@ -48,7 +49,7 @@ class ACMEServer(object):
self._acme_type = 'pebble' if acme_server == 'pebble' else 'boulder' self._acme_type = 'pebble' if acme_server == 'pebble' else 'boulder'
self._proxy = http_proxy self._proxy = http_proxy
self._workspace = tempfile.mkdtemp() self._workspace = tempfile.mkdtemp()
self._processes = [] self._processes = [] # type: List
self._stdout = sys.stdout if stdout else open(os.devnull, 'w') self._stdout = sys.stdout if stdout else open(os.devnull, 'w')
self._dns_server = dns_server self._dns_server = dns_server
@ -107,19 +108,26 @@ class ACMEServer(object):
"""Generate and return the acme_xdist dict""" """Generate and return the acme_xdist dict"""
acme_xdist = {'acme_server': acme_server, 'challtestsrv_port': CHALLTESTSRV_PORT} acme_xdist = {'acme_server': acme_server, 'challtestsrv_port': CHALLTESTSRV_PORT}
# Directory and ACME port are set implicitly in the docker-compose.yml files of Boulder/Pebble. # Directory and ACME port are set implicitly in the docker-compose.yml
# files of Boulder/Pebble.
if acme_server == 'pebble': if acme_server == 'pebble':
acme_xdist['directory_url'] = PEBBLE_DIRECTORY_URL acme_xdist['directory_url'] = PEBBLE_DIRECTORY_URL
else: # boulder else: # boulder
acme_xdist['directory_url'] = BOULDER_V2_DIRECTORY_URL \ acme_xdist['directory_url'] = BOULDER_V2_DIRECTORY_URL \
if acme_server == 'boulder-v2' else BOULDER_V1_DIRECTORY_URL if acme_server == 'boulder-v2' else BOULDER_V1_DIRECTORY_URL
acme_xdist['http_port'] = {node: port for (node, port) acme_xdist['http_port'] = {
in zip(nodes, range(5200, 5200 + len(nodes)))} node: port for (node, port) in # pylint: disable=unnecessary-comprehension
acme_xdist['https_port'] = {node: port for (node, port) zip(nodes, range(5200, 5200 + len(nodes)))
in zip(nodes, range(5100, 5100 + len(nodes)))} }
acme_xdist['other_port'] = {node: port for (node, port) acme_xdist['https_port'] = {
in zip(nodes, range(5300, 5300 + len(nodes)))} node: port for (node, port) in # pylint: disable=unnecessary-comprehension
zip(nodes, range(5100, 5100 + len(nodes)))
}
acme_xdist['other_port'] = {
node: port for (node, port) in # pylint: disable=unnecessary-comprehension
zip(nodes, range(5300, 5300 + len(nodes)))
}
self.acme_xdist = acme_xdist self.acme_xdist = acme_xdist
@ -150,9 +158,9 @@ class ACMEServer(object):
env=environ) env=environ)
# pebble_ocsp_server is imported here and not at the top of module in order to avoid a # pebble_ocsp_server is imported here and not at the top of module in order to avoid a
# useless ImportError, in the case where cryptography dependency is too old to support ocsp, # useless ImportError, in the case where cryptography dependency is too old to support
# but Boulder is used instead of Pebble, so pebble_ocsp_server is not used. This is the # ocsp, but Boulder is used instead of Pebble, so pebble_ocsp_server is not used. This is
# typical situation of integration-certbot-oldest tox testenv. # the typical situation of integration-certbot-oldest tox testenv.
from certbot_integration_tests.utils import pebble_ocsp_server from certbot_integration_tests.utils import pebble_ocsp_server
self._launch_process([sys.executable, pebble_ocsp_server.__file__]) self._launch_process([sys.executable, pebble_ocsp_server.__file__])
@ -195,13 +203,16 @@ class ACMEServer(object):
if not self._dns_server: if not self._dns_server:
# Configure challtestsrv to answer any A record request with ip of the docker host. # Configure challtestsrv to answer any A record request with ip of the docker host.
response = requests.post('http://localhost:{0}/set-default-ipv4'.format(CHALLTESTSRV_PORT), response = requests.post('http://localhost:{0}/set-default-ipv4'.format(
json={'ip': '10.77.77.1'}) CHALLTESTSRV_PORT), json={'ip': '10.77.77.1'}
)
response.raise_for_status() response.raise_for_status()
except BaseException: except BaseException:
# If we failed to set up boulder, print its logs. # If we failed to set up boulder, print its logs.
print('=> Boulder setup failed. Boulder logs are:') print('=> Boulder setup failed. Boulder logs are:')
process = self._launch_process(['docker-compose', 'logs'], cwd=instance_path, force_stderr=True) process = self._launch_process([
'docker-compose', 'logs'], cwd=instance_path, force_stderr=True
)
process.wait() process.wait()
raise raise
@ -221,12 +232,15 @@ class ACMEServer(object):
if not env: if not env:
env = os.environ env = os.environ
stdout = sys.stderr if force_stderr else self._stdout stdout = sys.stderr if force_stderr else self._stdout
process = subprocess.Popen(command, stdout=stdout, stderr=subprocess.STDOUT, cwd=cwd, env=env) process = subprocess.Popen(
command, stdout=stdout, stderr=subprocess.STDOUT, cwd=cwd, env=env
)
self._processes.append(process) self._processes.append(process)
return process return process
def main(): def main():
# pylint: disable=missing-function-docstring
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='CLI tool to start a local instance of Pebble or Boulder CA server.') description='CLI tool to start a local instance of Pebble or Boulder CA server.')
parser.add_argument('--server-type', '-s', parser.add_argument('--server-type', '-s',
@ -239,7 +253,10 @@ def main():
'resolve domains to localhost.') 'resolve domains to localhost.')
args = parser.parse_args() args = parser.parse_args()
acme_server = ACMEServer(args.server_type, [], http_proxy=False, stdout=True, dns_server=args.dns_server) acme_server = ACMEServer(
args.server_type, [], http_proxy=False, stdout=True,
dns_server=args.dns_server
)
try: try:
with acme_server as acme_xdist: with acme_server as acme_xdist:

View File

@ -2,12 +2,13 @@
"""Module to call certbot in test mode""" """Module to call certbot in test mode"""
from __future__ import absolute_import from __future__ import absolute_import
from distutils.version import LooseVersion
import os import os
import subprocess import subprocess
import sys import sys
from distutils.version import LooseVersion
import certbot_integration_tests import certbot_integration_tests
# pylint: disable=wildcard-import,unused-wildcard-import
from certbot_integration_tests.utils.constants import * from certbot_integration_tests.utils.constants import *
@ -35,6 +36,8 @@ def certbot_test(certbot_args, directory_url, http_01_port, tls_alpn_01_port,
def _prepare_environ(workspace): def _prepare_environ(workspace):
# pylint: disable=missing-function-docstring
new_environ = os.environ.copy() new_environ = os.environ.copy()
new_environ['TMPDIR'] = workspace new_environ['TMPDIR'] = workspace
@ -58,8 +61,13 @@ def _prepare_environ(workspace):
# certbot_integration_tests.__file__ is: # certbot_integration_tests.__file__ is:
# '/path/to/certbot/certbot-ci/certbot_integration_tests/__init__.pyc' # '/path/to/certbot/certbot-ci/certbot_integration_tests/__init__.pyc'
# ... and we want '/path/to/certbot' # ... and we want '/path/to/certbot'
certbot_root = os.path.dirname(os.path.dirname(os.path.dirname(certbot_integration_tests.__file__))) certbot_root = os.path.dirname(os.path.dirname(
python_paths = [path for path in new_environ['PYTHONPATH'].split(':') if path != certbot_root] os.path.dirname(certbot_integration_tests.__file__))
)
python_paths = [
path for path in new_environ['PYTHONPATH'].split(':')
if path != certbot_root
]
new_environ['PYTHONPATH'] = ':'.join(python_paths) new_environ['PYTHONPATH'] = ':'.join(python_paths)
return new_environ return new_environ
@ -70,7 +78,8 @@ def _compute_additional_args(workspace, environ, force_renew):
output = subprocess.check_output(['certbot', '--version'], output = subprocess.check_output(['certbot', '--version'],
universal_newlines=True, stderr=subprocess.STDOUT, universal_newlines=True, stderr=subprocess.STDOUT,
cwd=workspace, env=environ) cwd=workspace, env=environ)
version_str = output.split(' ')[1].strip() # Typical response is: output = 'certbot 0.31.0.dev0' # Typical response is: output = 'certbot 0.31.0.dev0'
version_str = output.split(' ')[1].strip()
if LooseVersion(version_str) >= LooseVersion('0.30.0'): if LooseVersion(version_str) >= LooseVersion('0.30.0'):
additional_args.append('--no-random-sleep-on-renew') additional_args.append('--no-random-sleep-on-renew')
@ -113,6 +122,7 @@ def _prepare_args_env(certbot_args, directory_url, http_01_port, tls_alpn_01_por
def main(): def main():
# pylint: disable=missing-function-docstring
args = sys.argv[1:] args = sys.argv[1:]
# Default config is pebble # Default config is pebble

View File

@ -7,4 +7,4 @@ BOULDER_V2_DIRECTORY_URL = 'http://localhost:4001/directory'
PEBBLE_DIRECTORY_URL = 'https://localhost:14000/dir' PEBBLE_DIRECTORY_URL = 'https://localhost:14000/dir'
PEBBLE_MANAGEMENT_URL = 'https://localhost:15000' PEBBLE_MANAGEMENT_URL = 'https://localhost:15000'
MOCK_OCSP_SERVER_PORT = 4002 MOCK_OCSP_SERVER_PORT = 4002
PEBBLE_ALTERNATE_ROOTS = 2 PEBBLE_ALTERNATE_ROOTS = 2

View File

@ -4,7 +4,6 @@ from __future__ import print_function
import os import os
import os.path import os.path
from pkg_resources import resource_filename
import shutil import shutil
import socket import socket
import subprocess import subprocess
@ -12,13 +11,14 @@ import sys
import tempfile import tempfile
import time import time
from pkg_resources import resource_filename
BIND_DOCKER_IMAGE = 'internetsystemsconsortium/bind9:9.16' BIND_DOCKER_IMAGE = "internetsystemsconsortium/bind9:9.16"
BIND_BIND_ADDRESS = ('127.0.0.1', 45953) BIND_BIND_ADDRESS = ("127.0.0.1", 45953)
# A TCP DNS message which is a query for '. CH A' transaction ID 0xcb37. This is used # A TCP DNS message which is a query for '. CH A' transaction ID 0xcb37. This is used
# by _wait_until_ready to check that BIND is responding without depending on dnspython. # by _wait_until_ready to check that BIND is responding without depending on dnspython.
BIND_TEST_QUERY = bytearray.fromhex('0011cb37000000010000000000000000010003') BIND_TEST_QUERY = bytearray.fromhex("0011cb37000000010000000000000000010003")
class DNSServer(object): class DNSServer(object):
@ -31,7 +31,7 @@ class DNSServer(object):
future to support parallelization (https://github.com/certbot/certbot/issues/8455). future to support parallelization (https://github.com/certbot/certbot/issues/8455).
""" """
def __init__(self, nodes, show_output=False): def __init__(self, unused_nodes, show_output=False):
""" """
Create an DNSServer instance. Create an DNSServer instance.
:param list nodes: list of node names that will be setup by pytest xdist :param list nodes: list of node names that will be setup by pytest xdist
@ -40,16 +40,13 @@ class DNSServer(object):
self.bind_root = tempfile.mkdtemp() self.bind_root = tempfile.mkdtemp()
self.process = None self.process = None # type: subprocess.Popen
self.dns_xdist = { self.dns_xdist = {"address": BIND_BIND_ADDRESS[0], "port": BIND_BIND_ADDRESS[1]}
'address': BIND_BIND_ADDRESS[0],
'port': BIND_BIND_ADDRESS[1]
}
# Unfortunately the BIND9 image forces everything to stderr with -g and we can't # Unfortunately the BIND9 image forces everything to stderr with -g and we can't
# modify the verbosity. # modify the verbosity.
self._output = sys.stderr if show_output else open(os.devnull, 'w') self._output = sys.stderr if show_output else open(os.devnull, "w")
def start(self): def start(self):
"""Start the DNS server""" """Start the DNS server"""
@ -63,11 +60,11 @@ class DNSServer(object):
def stop(self): def stop(self):
"""Stop the DNS server, and clean its resources""" """Stop the DNS server, and clean its resources"""
if self.process: if self.process:
try: try:
self.process.terminate() self.process.terminate()
self.process.wait() self.process.wait()
except BaseException as e: except BaseException as e:
print("BIND9 did not stop cleanly: {}".format(e), file=sys.stderr) print("BIND9 did not stop cleanly: {}".format(e), file=sys.stderr)
shutil.rmtree(self.bind_root, ignore_errors=True) shutil.rmtree(self.bind_root, ignore_errors=True)
@ -76,65 +73,79 @@ class DNSServer(object):
def _configure_bind(self): def _configure_bind(self):
"""Configure the BIND9 server based on the prebaked configuration""" """Configure the BIND9 server based on the prebaked configuration"""
bind_conf_src = resource_filename('certbot_integration_tests', 'assets/bind-config') bind_conf_src = resource_filename(
for dir in ('conf', 'zones'): "certbot_integration_tests", "assets/bind-config"
shutil.copytree(os.path.join(bind_conf_src, dir), os.path.join(self.bind_root, dir)) )
for directory in ("conf", "zones"):
shutil.copytree(
os.path.join(bind_conf_src, directory), os.path.join(self.bind_root, directory)
)
def _start_bind(self): def _start_bind(self):
"""Launch the BIND9 server as a Docker container""" """Launch the BIND9 server as a Docker container"""
addr_str = '{}:{}'.format(BIND_BIND_ADDRESS[0], BIND_BIND_ADDRESS[1]) addr_str = "{}:{}".format(BIND_BIND_ADDRESS[0], BIND_BIND_ADDRESS[1])
self.process = subprocess.Popen([ self.process = subprocess.Popen(
'docker', 'run', '--rm', [
'-p', '{}:53/udp'.format(addr_str), "docker",
'-p', '{}:53/tcp'.format(addr_str), "run",
'-v', '{}/conf:/etc/bind'.format(self.bind_root), "--rm",
'-v', '{}/zones:/var/lib/bind'.format(self.bind_root), "-p",
BIND_DOCKER_IMAGE "{}:53/udp".format(addr_str),
], stdout=self._output, stderr=self._output) "-p",
"{}:53/tcp".format(addr_str),
"-v",
"{}/conf:/etc/bind".format(self.bind_root),
"-v",
"{}/zones:/var/lib/bind".format(self.bind_root),
BIND_DOCKER_IMAGE,
],
stdout=self._output,
stderr=self._output,
)
if self.process.poll(): if self.process.poll():
raise("BIND9 server stopped unexpectedly") raise ValueError("BIND9 server stopped unexpectedly")
try: try:
self._wait_until_ready() self._wait_until_ready()
except: except:
# The container might be running even if we think it isn't # The container might be running even if we think it isn't
self.stop() self.stop()
raise raise
def _wait_until_ready(self, attempts=30): def _wait_until_ready(self, attempts=30):
# type: (int) -> None # type: (int) -> None
""" """
Polls the DNS server over TCP until it gets a response, or until Polls the DNS server over TCP until it gets a response, or until
it runs out of attempts and raises a ValueError. it runs out of attempts and raises a ValueError.
The DNS response message must match the txn_id of the DNS query message, The DNS response message must match the txn_id of the DNS query message,
but otherwise the contents are ignored. but otherwise the contents are ignored.
:param int attempts: The number of attempts to make. :param int attempts: The number of attempts to make.
""" """
for _ in range(attempts): for _ in range(attempts):
if self.process.poll(): if self.process.poll():
raise ValueError('BIND9 server stopped unexpectedly') raise ValueError("BIND9 server stopped unexpectedly")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5.0) sock.settimeout(5.0)
try: try:
sock.connect(BIND_BIND_ADDRESS) sock.connect(BIND_BIND_ADDRESS)
sock.sendall(BIND_TEST_QUERY) sock.sendall(BIND_TEST_QUERY)
buf = sock.recv(1024) buf = sock.recv(1024)
# We should receive a DNS message with the same tx_id # We should receive a DNS message with the same tx_id
if buf and len(buf) > 4 and buf[2:4] == BIND_TEST_QUERY[2:4]: if buf and len(buf) > 4 and buf[2:4] == BIND_TEST_QUERY[2:4]:
return return
# If we got a response but it wasn't the one we wanted, wait a little # If we got a response but it wasn't the one we wanted, wait a little
time.sleep(1) time.sleep(1)
except: except: # pylint: disable=bare-except
# If there was a network error, wait a little # If there was a network error, wait a little
time.sleep(1) time.sleep(1)
pass finally:
finally: sock.close()
sock.close()
raise ValueError( raise ValueError(
'Gave up waiting for DNS server {} to respond'.format(BIND_BIND_ADDRESS)) "Gave up waiting for DNS server {} to respond".format(BIND_BIND_ADDRESS)
)
def __enter__(self): def __enter__(self):
self.start() self.start()

View File

@ -39,6 +39,7 @@ def _suppress_x509_verification_warnings():
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError: except ImportError:
# Handle old versions of request with vendorized urllib3 # Handle old versions of request with vendorized urllib3
# pylint: disable=no-member
from requests.packages.urllib3.exceptions import InsecureRequestWarning from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
@ -256,7 +257,8 @@ def generate_csr(domains, key_path, csr_path, key_type=RSA_KEY_TYPE):
def read_certificate(cert_path): def read_certificate(cert_path):
""" """
Load the certificate from the provided path, and return a human readable version of it (TEXT mode). Load the certificate from the provided path, and return a human readable version
of it (TEXT mode).
:param str cert_path: the path to the certificate :param str cert_path: the path to the certificate
:returns: the TEXT version of the certificate, as it would be displayed by openssl binary :returns: the TEXT version of the certificate, as it would be displayed by openssl binary
""" """

View File

@ -1,3 +1,5 @@
# pylint: disable=missing-module-docstring
import json import json
import os import os
import stat import stat
@ -12,6 +14,7 @@ ASSETS_PATH = pkg_resources.resource_filename('certbot_integration_tests', 'asse
def fetch(workspace): def fetch(workspace):
# pylint: disable=missing-function-docstring
suffix = 'linux-amd64' if os.name != 'nt' else 'windows-amd64.exe' suffix = 'linux-amd64' if os.name != 'nt' else 'windows-amd64.exe'
pebble_path = _fetch_asset('pebble', suffix) pebble_path = _fetch_asset('pebble', suffix)

View File

@ -21,6 +21,7 @@ from certbot_integration_tests.utils.misc import GracefulTCPServer
class _ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): class _ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
# pylint: disable=missing-function-docstring
def do_POST(self): def do_POST(self):
request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediate-keys/0', verify=False) request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediate-keys/0', verify=False)
issuer_key = serialization.load_pem_private_key(request.content, None, default_backend()) issuer_key = serialization.load_pem_private_key(request.content, None, default_backend())
@ -35,20 +36,28 @@ class _ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
ocsp_request = ocsp.load_der_ocsp_request(self.rfile.read(content_len)) ocsp_request = ocsp.load_der_ocsp_request(self.rfile.read(content_len))
response = requests.get('{0}/cert-status-by-serial/{1}'.format( response = requests.get('{0}/cert-status-by-serial/{1}'.format(
PEBBLE_MANAGEMENT_URL, str(hex(ocsp_request.serial_number)).replace('0x', '')), verify=False) PEBBLE_MANAGEMENT_URL, str(hex(ocsp_request.serial_number)).replace('0x', '')),
verify=False
)
if not response.ok: if not response.ok:
ocsp_response = ocsp.OCSPResponseBuilder.build_unsuccessful(ocsp.OCSPResponseStatus.UNAUTHORIZED) ocsp_response = ocsp.OCSPResponseBuilder.build_unsuccessful(
ocsp.OCSPResponseStatus.UNAUTHORIZED
)
else: else:
data = response.json() data = response.json()
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
cert = x509.load_pem_x509_certificate(data['Certificate'].encode(), default_backend()) cert = x509.load_pem_x509_certificate(data['Certificate'].encode(), default_backend())
if data['Status'] != 'Revoked': if data['Status'] != 'Revoked':
ocsp_status, revocation_time, revocation_reason = ocsp.OCSPCertStatus.GOOD, None, None ocsp_status = ocsp.OCSPCertStatus.GOOD
revocation_time = None
revocation_reason = None
else: else:
ocsp_status, revocation_reason = ocsp.OCSPCertStatus.REVOKED, x509.ReasonFlags.unspecified ocsp_status = ocsp.OCSPCertStatus.REVOKED
revoked_at = re.sub(r'( \+\d{4}).*$', r'\1', data['RevokedAt']) # "... +0000 UTC" => "+0000" revocation_reason = x509.ReasonFlags.unspecified
# "... +0000 UTC" => "+0000"
revoked_at = re.sub(r'( \+\d{4}).*$', r'\1', data['RevokedAt'])
revocation_time = parser.parse(revoked_at) revocation_time = parser.parse(revoked_at)
ocsp_response = ocsp.OCSPResponseBuilder().add_response( ocsp_response = ocsp.OCSPResponseBuilder().add_response(

View File

@ -1,4 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# pylint: disable=missing-module-docstring
import json import json
import re import re
import sys import sys
@ -10,7 +12,9 @@ from certbot_integration_tests.utils.misc import GracefulTCPServer
def _create_proxy(mapping): def _create_proxy(mapping):
# pylint: disable=missing-function-docstring
class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
# pylint: disable=missing-class-docstring
def do_GET(self): def do_GET(self):
headers = {key.lower(): value for key, value in self.headers.items()} headers = {key.lower(): value for key, value in self.headers.items()}
backend = [backend for pattern, backend in mapping.items() backend = [backend for pattern, backend in mapping.items()

View File

@ -10,7 +10,9 @@ from pylint.checkers import BaseChecker
from pylint.interfaces import IAstroidChecker from pylint.interfaces import IAstroidChecker
# Modules in theses packages can import the os module. # Modules in theses packages can import the os module.
WHITELIST_PACKAGES = ['acme', 'certbot_compatibility_test', 'lock_test'] WHITELIST_PACKAGES = [
'acme', 'certbot_integration_tests', 'certbot_compatibility_test', 'lock_test'
]
class ForbidStandardOsModule(BaseChecker): class ForbidStandardOsModule(BaseChecker):
@ -25,8 +27,8 @@ class ForbidStandardOsModule(BaseChecker):
'E5001': ( 'E5001': (
'Forbidden use of os module, certbot.compat.os must be used instead', 'Forbidden use of os module, certbot.compat.os must be used instead',
'os-module-forbidden', 'os-module-forbidden',
'Some methods from the standard os module cannot be used for security reasons on Windows: ' 'Some methods from the standard os module cannot be used for security reasons on '
'the safe wrapper certbot.compat.os must be used instead in Certbot.' 'Windows: the safe wrapper certbot.compat.os must be used instead in Certbot.'
) )
} }
priority = -1 priority = -1

View File

@ -40,6 +40,7 @@ install_packages =
source_paths = source_paths =
acme/acme acme/acme
certbot/certbot certbot/certbot
certbot-ci/certbot_integration_tests
certbot-apache/certbot_apache certbot-apache/certbot_apache
certbot-compatibility-test/certbot_compatibility_test certbot-compatibility-test/certbot_compatibility_test
certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudflare/certbot_dns_cloudflare