1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Nginx creates a vhost block if no matching block is found (#5153)

* Allow authentication if there's no appropriate vhost

* Update test

* add flag to suppress raising error if no match is found

* Allow installation if there's no appropriate vhost

* remove traceback

* make new vhost ssl

* Fix existing bugs in nginxparser.py and obj.py

* Switch isinstance(x, str) to isinstance(x, six.string_types) in the Nginx plugin

* remove unused import

* remove unneeded custom copy from Addr

* Add docstring for create_new_vhost_from_default

* add test for create_new_vhost_from_default

* add configurator tests and leave finding the first server block for another PR

* don't assume order from a set

* address multiple default_server problem

* don't add vhosts twice

* update unit tests

* update docstring

* Add logger.info message for using default address in tlssni01 auth
This commit is contained in:
ohemorange
2017-10-13 12:29:02 -07:00
committed by Brad Warren
parent 99f00d21c4
commit 95a7d45856
9 changed files with 258 additions and 32 deletions

View File

@@ -117,6 +117,9 @@ class NginxConfigurator(common.Installer):
# Files to save
self.save_notes = ""
# For creating new vhosts if no names match
self.new_vhost = None
# Add number of outstanding challenges
self._chall_out = 0
@@ -191,9 +194,11 @@ class NginxConfigurator(common.Installer):
"The nginx plugin currently requires --fullchain-path to "
"install a cert.")
vhost = self.choose_vhost(domain)
cert_directives = [['\n', 'ssl_certificate', ' ', fullchain_path],
['\n', 'ssl_certificate_key', ' ', key_path]]
vhost = self.choose_vhost(domain, raise_if_no_match=False)
if vhost is None:
vhost = self._vhost_from_duplicated_default(domain)
cert_directives = [['\n ', 'ssl_certificate', ' ', fullchain_path],
['\n ', 'ssl_certificate_key', ' ', key_path]]
self.parser.add_server_directives(vhost,
cert_directives, replace=True)
@@ -209,7 +214,7 @@ class NginxConfigurator(common.Installer):
#######################
# Vhost parsing methods
#######################
def choose_vhost(self, target_name):
def choose_vhost(self, target_name, raise_if_no_match=True):
"""Chooses a virtual host based on the given domain name.
.. note:: This makes the vhost SSL-enabled if it isn't already. Follows
@@ -223,6 +228,8 @@ class NginxConfigurator(common.Installer):
hostname. Currently we just ignore this.
:param str target_name: domain name
:param bool raise_if_no_match: True iff not finding a match is an error;
otherwise, return None
:returns: ssl vhost associated with name
:rtype: :class:`~certbot_nginx.obj.VirtualHost`
@@ -233,13 +240,16 @@ class NginxConfigurator(common.Installer):
matches = self._get_ranked_matches(target_name)
vhost = self._select_best_name_match(matches)
if not vhost:
# No matches. Raise a misconfiguration error.
raise errors.MisconfigurationError(
("Cannot find a VirtualHost matching domain %s. "
"In order for Certbot to correctly perform the challenge "
"please add a corresponding server_name directive to your "
"nginx configuration: "
"https://nginx.org/en/docs/http/server_names.html") % (target_name))
if raise_if_no_match:
# No matches. Raise a misconfiguration error.
raise errors.MisconfigurationError(
("Cannot find a VirtualHost matching domain %s. "
"In order for Certbot to correctly perform the challenge "
"please add a corresponding server_name directive to your "
"nginx configuration: "
"https://nginx.org/en/docs/http/server_names.html") % (target_name))
else:
return None
else:
# Note: if we are enhancing with ocsp, vhost should already be ssl.
if not vhost.ssl:
@@ -247,6 +257,37 @@ class NginxConfigurator(common.Installer):
return vhost
def _vhost_from_duplicated_default(self, domain):
if self.new_vhost is None:
default_vhost = self._get_default_vhost()
self.new_vhost = self.parser.create_new_vhost_from_default(default_vhost)
if not self.new_vhost.ssl:
self._make_server_ssl(self.new_vhost)
self.new_vhost.names = set()
self.new_vhost.names.add(domain)
name_block = [['\n ', 'server_name', ' ', ' '.join(self.new_vhost.names)]]
self.parser.add_server_directives(self.new_vhost, name_block, replace=True)
return self.new_vhost
def _get_default_vhost(self):
vhost_list = self.parser.get_vhosts()
# if one has default_server set, return that one
default_vhosts = []
for vhost in vhost_list:
for addr in vhost.addrs:
if addr.default:
default_vhosts.append(vhost)
break
if len(default_vhosts) == 1:
return default_vhosts[0]
# TODO: present a list of vhosts for user to choose from
raise errors.MisconfigurationError("Could not automatically find a matching server"
" block. Set the `server_name` directive to use the Nginx installer.")
def _get_ranked_matches(self, target_name):
"""Returns a ranked list of vhosts that match target_name.
The ranking gives preference to SSL vhosts.

View File

@@ -7,6 +7,7 @@ from pyparsing import (
Literal, White, Forward, Group, Optional, OneOrMore, QuotedString, Regex, ZeroOrMore, Combine)
from pyparsing import stringEnd
from pyparsing import restOfLine
import six
logger = logging.getLogger(__name__)
@@ -71,7 +72,7 @@ class RawNginxDumper(object):
"""Iterates the dumped nginx content."""
blocks = blocks or self.blocks
for b0 in blocks:
if isinstance(b0, str):
if isinstance(b0, six.string_types):
yield b0
continue
item = copy.deepcopy(b0)
@@ -88,7 +89,7 @@ class RawNginxDumper(object):
yield '}'
else: # not a block - list of strings
semicolon = ";"
if isinstance(item[0], str) and item[0].strip() == '#': # comment
if isinstance(item[0], six.string_types) and item[0].strip() == '#': # comment
semicolon = ""
yield "".join(item) + semicolon
@@ -145,7 +146,7 @@ def dump(blocks, _file):
return _file.write(dumps(blocks))
spacey = lambda x: (isinstance(x, str) and x.isspace()) or x == ''
spacey = lambda x: (isinstance(x, six.string_types) and x.isspace()) or x == ''
class UnspacedList(list):
"""Wrap a list [of lists], making any whitespace entries magically invisible"""
@@ -189,13 +190,15 @@ class UnspacedList(list):
item, spaced_item = self._coerce(x)
slicepos = self._spaced_position(i) if i < len(self) else len(self.spaced)
self.spaced.insert(slicepos, spaced_item)
list.insert(self, i, item)
if not spacey(item):
list.insert(self, i, item)
self.dirty = True
def append(self, x):
item, spaced_item = self._coerce(x)
self.spaced.append(spaced_item)
list.append(self, item)
if not spacey(item):
list.append(self, item)
self.dirty = True
def extend(self, x):
@@ -226,7 +229,8 @@ class UnspacedList(list):
raise NotImplementedError("Slice operations on UnspacedLists not yet implemented")
item, spaced_item = self._coerce(value)
self.spaced.__setitem__(self._spaced_position(i), spaced_item)
list.__setitem__(self, i, item)
if not spacey(item):
list.__setitem__(self, i, item)
self.dirty = True
def __delitem__(self, i):
@@ -235,8 +239,8 @@ class UnspacedList(list):
self.dirty = True
def __deepcopy__(self, memo):
l = UnspacedList(self[:])
l.spaced = copy.deepcopy(self.spaced, memo=memo)
new_spaced = copy.deepcopy(self.spaced, memo=memo)
l = UnspacedList(new_spaced)
l.dirty = self.dirty
return l

View File

@@ -198,7 +198,7 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
def _find_directive(directives, directive_name):
"""Find a directive of type directive_name in directives
"""
if not directives or isinstance(directives, str) or len(directives) == 0:
if not directives or isinstance(directives, six.string_types) or len(directives) == 0:
return None
if directives[0] == directive_name:

View File

@@ -6,6 +6,8 @@ import os
import pyparsing
import re
import six
from certbot import errors
from certbot_nginx import obj
@@ -312,6 +314,32 @@ class NginxParser(object):
except errors.MisconfigurationError as err:
raise errors.MisconfigurationError("Problem in %s: %s" % (filename, str(err)))
def create_new_vhost_from_default(self, vhost_template):
"""Duplicate the default vhost in the configuration files.
:param :class:`~certbot_nginx.obj.VirtualHost` vhost_template: The vhost
whose information we copy
:returns: A vhost object for the newly created vhost
:rtype: :class:`~certbot_nginx.obj.VirtualHost`
"""
# TODO: https://github.com/certbot/certbot/issues/5185
# put it in the same file as the template, at the same level
enclosing_block = self.parsed[vhost_template.filep]
for index in vhost_template.path[:-1]:
enclosing_block = enclosing_block[index]
new_location = vhost_template.path[-1] + 1
raw_in_parsed = copy.deepcopy(enclosing_block[vhost_template.path[-1]])
enclosing_block.insert(new_location, raw_in_parsed)
new_vhost = copy.deepcopy(vhost_template)
new_vhost.path[-1] = new_location
for addr in new_vhost.addrs:
addr.default = False
for directive in enclosing_block[new_vhost.path[-1]][1]:
if len(directive) > 0 and directive[0] == 'listen' and 'default_server' in directive:
del directive[directive.index('default_server')]
return new_vhost
def _parse_ssl_options(ssl_options):
if ssl_options is not None:
try:
@@ -444,7 +472,7 @@ def _is_include_directive(entry):
"""
return (isinstance(entry, list) and
len(entry) == 2 and entry[0] == 'include' and
isinstance(entry[1], str))
isinstance(entry[1], six.string_types))
def _is_ssl_on_directive(entry):
"""Checks if an nginx parsed entry is an 'ssl on' directive.
@@ -561,7 +589,8 @@ def _add_directive(block, directive, replace):
directive_name = directive[0]
def can_append(loc, dir_name):
""" Can we append this directive to the block? """
return loc is None or (isinstance(dir_name, str) and dir_name in REPEATABLE_DIRECTIVES)
return loc is None or (isinstance(dir_name, six.string_types)
and dir_name in REPEATABLE_DIRECTIVES)
err_fmt = 'tried to insert directive "{0}" but found conflicting "{1}".'

View File

@@ -542,6 +542,138 @@ class NginxConfiguratorTest(util.NginxTest):
self.assertTrue(util.contains_at_depth(
generated_conf, ['ssl_stapling_verify', 'on'], 2))
def test_deploy_no_match_default_set(self):
default_conf = self.config.parser.abs_path('sites-enabled/default')
foo_conf = self.config.parser.abs_path('foo.conf')
del self.config.parser.parsed[foo_conf][2][1][0][1][0] # remove default_server
self.config.version = (1, 3, 1)
self.config.deploy_cert(
"www.nomatch.com",
"example/cert.pem",
"example/key.pem",
"example/chain.pem",
"example/fullchain.pem")
self.config.save()
self.config.parser.load()
parsed_default_conf = util.filter_comments(self.config.parser.parsed[default_conf])
self.assertEqual([[['server'],
[['listen', 'myhost', 'default_server'],
['listen', 'otherhost', 'default_server'],
['server_name', 'www.example.org'],
[['location', '/'],
[['root', 'html'],
['index', 'index.html', 'index.htm']]]]],
[['server'],
[['listen', 'myhost'],
['listen', 'otherhost'],
['server_name', 'www.nomatch.com'],
[['location', '/'],
[['root', 'html'],
['index', 'index.html', 'index.htm']]],
['listen', '5001', 'ssl'],
['ssl_certificate', 'example/fullchain.pem'],
['ssl_certificate_key', 'example/key.pem'],
['include', self.config.mod_ssl_conf],
['ssl_dhparam', self.config.ssl_dhparams]]]],
parsed_default_conf)
self.config.deploy_cert(
"nomatch.com",
"example/cert.pem",
"example/key.pem",
"example/chain.pem",
"example/fullchain.pem")
self.config.save()
self.config.parser.load()
parsed_default_conf = util.filter_comments(self.config.parser.parsed[default_conf])
self.assertTrue(util.contains_at_depth(parsed_default_conf, "nomatch.com", 3))
def test_deploy_no_match_default_set_multi_level_path(self):
default_conf = self.config.parser.abs_path('sites-enabled/default')
foo_conf = self.config.parser.abs_path('foo.conf')
del self.config.parser.parsed[default_conf][0][1][0]
del self.config.parser.parsed[default_conf][0][1][0]
self.config.version = (1, 3, 1)
self.config.deploy_cert(
"www.nomatch.com",
"example/cert.pem",
"example/key.pem",
"example/chain.pem",
"example/fullchain.pem")
self.config.save()
self.config.parser.load()
parsed_foo_conf = util.filter_comments(self.config.parser.parsed[foo_conf])
self.assertEqual([['server'],
[['listen', '*:80', 'ssl'],
['server_name', 'www.nomatch.com'],
['root', '/home/ubuntu/sites/foo/'],
[['location', '/status'], [[['types'], [['image/jpeg', 'jpg']]]]],
[['location', '~', 'case_sensitive\\.php$'], [['index', 'index.php'],
['root', '/var/root']]],
[['location', '~*', 'case_insensitive\\.php$'], []],
[['location', '=', 'exact_match\\.php$'], []],
[['location', '^~', 'ignore_regex\\.php$'], []],
['ssl_certificate', 'example/fullchain.pem'],
['ssl_certificate_key', 'example/key.pem']]],
parsed_foo_conf[1][1][1])
def test_deploy_no_match_no_default_set(self):
default_conf = self.config.parser.abs_path('sites-enabled/default')
foo_conf = self.config.parser.abs_path('foo.conf')
del self.config.parser.parsed[default_conf][0][1][0]
del self.config.parser.parsed[default_conf][0][1][0]
del self.config.parser.parsed[foo_conf][2][1][0][1][0]
self.config.version = (1, 3, 1)
self.assertRaises(errors.MisconfigurationError, self.config.deploy_cert,
"www.nomatch.com", "example/cert.pem", "example/key.pem",
"example/chain.pem", "example/fullchain.pem")
def test_deploy_no_match_fail_multiple_defaults(self):
self.config.version = (1, 3, 1)
self.assertRaises(errors.MisconfigurationError, self.config.deploy_cert,
"www.nomatch.com", "example/cert.pem", "example/key.pem",
"example/chain.pem", "example/fullchain.pem")
def test_deploy_no_match_add_redirect(self):
default_conf = self.config.parser.abs_path('sites-enabled/default')
foo_conf = self.config.parser.abs_path('foo.conf')
del self.config.parser.parsed[foo_conf][2][1][0][1][0] # remove default_server
self.config.version = (1, 3, 1)
self.config.deploy_cert(
"www.nomatch.com",
"example/cert.pem",
"example/key.pem",
"example/chain.pem",
"example/fullchain.pem")
self.config.enhance("www.nomatch.com", "redirect")
self.config.save()
self.config.parser.load()
expected = [
['if', '($scheme', '!=', '"https")'],
[['return', '301', 'https://$host$request_uri']]
]
generated_conf = self.config.parser.parsed[default_conf]
self.assertTrue(util.contains_at_depth(generated_conf, expected, 2))
class InstallSslOptionsConfTest(util.NginxTest):
"""Test that the options-ssl-nginx.conf file is installed and updated properly."""

View File

@@ -139,7 +139,8 @@ class NginxParserTest(util.NginxTest): #pylint: disable=too-many-public-methods
False, True,
set(['.example.com', 'example.*']), [], [0])
vhost4 = obj.VirtualHost(nparser.abs_path('sites-enabled/default'),
[obj.Addr('myhost', '', False, True)],
[obj.Addr('myhost', '', False, True),
obj.Addr('otherhost', '', False, True)],
False, True, set(['www.example.org']),
[], [0])
vhost5 = obj.VirtualHost(nparser.abs_path('foo.conf'),
@@ -395,6 +396,29 @@ class NginxParserTest(util.NginxTest): #pylint: disable=too-many-public-methods
])
self.assertTrue(server['ssl'])
def test_create_new_vhost_from_default(self):
nparser = parser.NginxParser(self.config_path)
vhosts = nparser.get_vhosts()
default = [x for x in vhosts if 'default' in x.filep][0]
new_vhost = nparser.create_new_vhost_from_default(default)
nparser.filedump(ext='')
# check properties of new vhost
self.assertFalse(next(iter(new_vhost.addrs)).default)
self.assertNotEqual(new_vhost.path, default.path)
# check that things are written to file correctly
new_nparser = parser.NginxParser(self.config_path)
new_vhosts = new_nparser.get_vhosts()
new_defaults = [x for x in new_vhosts if 'default' in x.filep]
self.assertEqual(len(new_defaults), 2)
new_vhost_parsed = new_defaults[1]
self.assertFalse(next(iter(new_vhost_parsed.addrs)).default)
self.assertEqual(next(iter(default.names)), next(iter(new_vhost_parsed.names)))
self.assertEqual(len(default.raw), len(new_vhost_parsed.raw))
self.assertTrue(next(iter(default.addrs)).super_eq(next(iter(new_vhost_parsed.addrs))))
if __name__ == "__main__":
unittest.main() # pragma: no cover

View File

@@ -1,5 +1,6 @@
server {
listen myhost default_server;
listen otherhost default_server;
server_name www.example.org;
location / {

View File

@@ -66,7 +66,7 @@ class TlsSniPerformTest(util.NginxTest):
self.sni.add_chall(self.achalls[1])
mock_choose.return_value = None
result = self.sni.perform()
self.assertTrue(result is None)
self.assertFalse(result is None)
def test_perform0(self):
responses = self.sni.perform()

View File

@@ -52,18 +52,13 @@ class NginxTlsSni01(common.TLSSNI01):
self.configurator.config.tls_sni_01_port)
for achall in self.achalls:
vhost = self.configurator.choose_vhost(achall.domain)
if vhost is None:
logger.error(
"No nginx vhost exists with server_name matching: %s. "
"Please specify server_names in the Nginx config.",
achall.domain)
return None
vhost = self.configurator.choose_vhost(achall.domain, raise_if_no_match=False)
if vhost.addrs:
if vhost is not None and vhost.addrs:
addresses.append(list(vhost.addrs))
else:
addresses.append([obj.Addr.fromstring(default_addr)])
logger.info("Using default address %s for TLSSNI01 authentication.", default_addr)
# Create challenge certs
responses = [self._setup_challenge_cert(x) for x in self.achalls]