1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-21 19:01:07 +03:00

start adding nginx stubs

This commit is contained in:
yan
2015-03-23 13:53:44 -04:00
committed by yan
parent 55188c52e8
commit 46db59d774
19 changed files with 3217 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Let's Encrypt client.plugins.nginx."""

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,201 @@
"""NginxDVSNI"""
import logging
import os
from letsencrypt.client.plugins.nginx import parser
class NginxDvsni(object):
"""Class performs DVSNI challenges within the Nginx configurator.
:ivar configurator: NginxConfigurator object
:type configurator: :class:`~nginx.configurator.NginxConfigurator`
:ivar list achalls: Annotated :class:`~letsencrypt.client.achallenges.DVSNI`
challenges.
:param list indices: Meant to hold indices of challenges in a
larger array. NginxDvsni is capable of solving many challenges
at once which causes an indexing issue within NginxConfigurator
who must return all responses in order. Imagine NginxConfigurator
maintaining state about where all of the SimpleHTTPS Challenges,
Dvsni Challenges belong in the response array. This is an optional
utility.
:param str challenge_conf: location of the challenge config file
"""
VHOST_TEMPLATE = """\
<VirtualHost {vhost}>
ServerName {server_name}
UseCanonicalName on
SSLStrictSNIVHostCheck on
LimitRequestBody 1048576
Include {ssl_options_conf_path}
SSLCertificateFile {cert_path}
SSLCertificateKeyFile {key_path}
DocumentRoot {document_root}
</VirtualHost>
"""
def __init__(self, configurator):
self.configurator = configurator
self.achalls = []
self.indices = []
self.challenge_conf = os.path.join(
configurator.config.config_dir, "le_dvsni_cert_challenge.conf")
# self.completed = 0
def add_chall(self, achall, idx=None):
"""Add challenge to DVSNI object to perform at once.
:param achall: Annotated DVSNI challenge.
:type achall: :class:`letsencrypt.client.achallenges.DVSNI`
:param int idx: index to challenge in a larger array
"""
self.achalls.append(achall)
if idx is not None:
self.indices.append(idx)
def perform(self):
"""Peform a DVSNI challenge."""
if not self.achalls:
return []
# Save any changes to the configuration as a precaution
# About to make temporary changes to the config
self.configurator.save()
addresses = []
default_addr = "*:443"
for achall in self.achalls:
vhost = self.configurator.choose_vhost(achall.domain)
if vhost is None:
logging.error(
"No vhost exists with servername or alias of: %s",
achall.domain)
logging.error("No _default_:443 vhost exists")
logging.error("Please specify servernames in the Nginx config")
return None
# TODO - @jdkasten review this code to make sure it makes sense
self.configurator.make_server_sni_ready(vhost, default_addr)
for addr in vhost.addrs:
if "_default_" == addr.get_addr():
addresses.append([default_addr])
break
else:
addresses.append(list(vhost.addrs))
responses = []
# Create all of the challenge certs
for achall in self.achalls:
responses.append(self._setup_challenge_cert(achall))
# Setup the configuration
self._mod_config(addresses)
# Save reversible changes
self.configurator.save("SNI Challenge", True)
return responses
def _setup_challenge_cert(self, achall, s=None):
# pylint: disable=invalid-name
"""Generate and write out challenge certificate."""
cert_path = self.get_cert_file(achall)
# Register the path before you write out the file
self.configurator.reverter.register_file_creation(True, cert_path)
cert_pem, response = achall.gen_cert_and_response(s)
# Write out challenge cert
with open(cert_path, "w") as cert_chall_fd:
cert_chall_fd.write(cert_pem)
return response
def _mod_config(self, ll_addrs):
"""Modifies Nginx config files to include challenge vhosts.
Result: Nginx config includes virtual servers for issued challs
:param list ll_addrs: list of list of
:class:`letsencrypt.client.plugins.nginx.obj.Addr` to apply
"""
# TODO: Use ip address of existing vhost instead of relying on FQDN
config_text = "<IfModule mod_ssl.c>\n"
for idx, lis in enumerate(ll_addrs):
config_text += self._get_config_text(self.achalls[idx], lis)
config_text += "</IfModule>\n"
self._conf_include_check(self.configurator.parser.loc["default"])
self.configurator.reverter.register_file_creation(
True, self.challenge_conf)
with open(self.challenge_conf, "w") as new_conf:
new_conf.write(config_text)
def _conf_include_check(self, main_config):
"""Adds DVSNI challenge conf file into configuration.
Adds DVSNI challenge include file if it does not already exist
within mainConfig
:param str main_config: file path to main user nginx config file
"""
if len(self.configurator.parser.find_dir(
parser.case_i("Include"), self.challenge_conf)) == 0:
# print "Including challenge virtual host(s)"
self.configurator.parser.add_dir(
parser.get_aug_path(main_config),
"Include", self.challenge_conf)
def _get_config_text(self, achall, ip_addrs):
"""Chocolate virtual server configuration text
:param achall: Annotated DVSNI challenge.
:type achall: :class:`letsencrypt.client.achallenges.DVSNI`
:param list ip_addrs: addresses of challenged domain
:class:`list` of type :class:`~nginx.obj.Addr`
:returns: virtual host configuration text
:rtype: str
"""
ips = " ".join(str(i) for i in ip_addrs)
document_root = os.path.join(
self.configurator.config.config_dir, "dvsni_page/")
# TODO: Python docs is not clear how mutliline string literal
# newlines are parsed on different platforms. At least on
# Linux (Debian sid), when source file uses CRLF, Python still
# parses it as "\n"... c.f.:
# https://docs.python.org/2.7/reference/lexical_analysis.html
return self.VHOST_TEMPLATE.format(
vhost=ips, server_name=achall.nonce_domain,
ssl_options_conf_path=self.configurator.parser.loc["ssl_options"],
cert_path=self.get_cert_file(achall), key_path=achall.key.file,
document_root=document_root).replace("\n", os.linesep)
def get_cert_file(self, achall):
"""Returns standardized name for challenge certificate.
:param achall: Annotated DVSNI challenge.
:type achall: :class:`letsencrypt.client.achallenges.DVSNI`
:returns: certificate file name
:rtype: str
"""
return os.path.join(
self.configurator.config.work_dir, achall.nonce_domain + ".crt")

View File

@@ -0,0 +1,208 @@
import zope.interface
from letsencrypt.client import augeas_configurator
from letsencrypt.client import CONFIG
from letsencrypt.client import interfaces
# This might be helpful... but feel free to use whatever you want
# class VH(object):
# def __init__(self, filename_path, vh_path, vh_addrs, is_ssl, is_enabled):
# self.file = filename_path
# self.path = vh_path
# self.addrs = vh_addrs
# self.names = []
# self.ssl = is_ssl
# self.enabled = is_enabled
# def set_names(self, listOfNames):
# self.names = listOfNames
# def add_name(self, name):
# self.names.append(name)
class NginxConfigurator(augeas_configurator.AugeasConfigurator):
"""Nginx Configurator class."""
zope.interface.implements(interfaces.IAuthenticator, interfaces.IInstaller)
def __init__(self, server_root=CONFIG.SERVER_ROOT):
super(NginxConfigurator, self).__init__()
self.server_root = server_root
# See if any temporary changes need to be recovered
# This needs to occur before VH objects are setup...
# because this will change the underlying configuration and potential
# vhosts
self.recovery_routine()
# Check for errors in parsing files with Augeas
# TODO - insert nginx lens info here???
#self.check_parsing_errors("httpd.aug")
def deploy_cert(self, vhost, cert, key, cert_chain=None):
"""Deploy cert in nginx"""
def choose_virtual_host(self, name):
"""Chooses a virtual host based on the given domain name"""
def get_all_names(self):
"""Returns all names found in the nginx configuration"""
return set()
# Might be helpful... I know nothing about nginx lens
# def get_include_path(self, cur_dir, arg):
# """
# Converts an Nginx Include directive argument into an Augeas
# searchable path
# Returns path string
# """
# # Sanity check argument - maybe
# # Question: what can the attacker do with control over this string
# # Effect parse file... maybe exploit unknown errors in Augeas
# # If the attacker can Include anything though... and this function
# # only operates on Nginx real config data... then the attacker has
# # already won.
# # Perhaps it is better to simply check the permissions on all
# # included files?
# # check_config to validate nginx config doesn't work because it
# # would create a race condition between the check and this input
# # TODO: Fix this
# # Check to make sure only expected characters are used, maybe remove
# # validChars = re.compile("[a-zA-Z0-9.*?_-/]*")
# # matchObj = validChars.match(arg)
# # if matchObj.group() != arg:
# # logging.error("Error: Invalid regexp characters in %s", arg)
# # return []
# # Standardize the include argument based on server root
# if not arg.startswith("/"):
# arg = cur_dir + arg
# # conf/ is a special variable for ServerRoot in Nginx
# elif arg.startswith("conf/"):
# arg = self.server_root + arg[5:]
# # TODO: Test if Nginx allows ../ or ~/ for Includes
# # Attempts to add a transform to the file if one does not already
# # exist
# self.parse_file(arg)
# # Argument represents an fnmatch regular expression, convert it
# # Split up the path and convert each into an Augeas accepted regex
# # then reassemble
# if "*" in arg or "?" in arg:
# postfix = ""
# splitArg = arg.split("/")
# for idx, split in enumerate(splitArg):
# # * and ? are the two special fnmatch characters
# if "*" in split or "?" in split:
# # Turn it into a augeas regex
# # TODO: Can this be an augeas glob instead of regex
# splitArg[idx] = ("* [label()=~regexp('%s')]" %
# self.fnmatch_to_re(split)
# # Reassemble the argument
# arg = "/".join(splitArg)
# # If the include is a directory, just return the directory as a file
# if arg.endswith("/"):
# return "/files" + arg[:len(arg)-1]
# return "/files"+arg
def enable_redirect(self, ssl_vhost):
"""
Adds Redirect directive to the port 80 equivalent of ssl_vhost
First the function attempts to find the vhost with equivalent
ip addresses that serves on non-ssl ports
The function then adds the directive
"""
return
def enable_ocsp_stapling(self, ssl_vhost):
return False
def enable_hsts(self, ssl_vhost):
return False
def get_all_certs_keys(self):
"""
Retrieve all certs and keys set in VirtualHosts on the Nginx server
returns: list of tuples with form [(cert, key, path)]
"""
return None
# Probably helpful reference
# def get_file_path(self, vhost_path):
# """
# Takes in Augeas path and returns the file name
# """
# # Strip off /files
# avail_fp = vhost_path[6:]
# # This can be optimized...
# while True:
# # Cast both to lowercase to be case insensitive
# find_if = avail_fp.lower().find("/ifmodule")
# if find_if != -1:
# avail_fp = avail_fp[:find_if]
# continue
# find_vh = avail_fp.lower().find("/virtualhost")
# if find_vh != -1:
# avail_fp = avail_fp[:find_vh]
# continue
# break
# return avail_fp
def enable_site(self, vhost):
"""Enables an available site, Nginx restart required"""
return False
# Might be a usefule reference
# def parse_file(self, file_path):
# """
# Checks to see if file_path is parsed by Augeas
# If file_path isn't parsed, the file is added and Augeas is reloaded
# """
# # Test if augeas included file for Httpd.lens
# # Note: This works for augeas globs, ie. *.conf
# incTest = self.aug.match(
# "/augeas/load/Httpd/incl [. ='" + file_path + "']")
# if not incTest:
# # Load up files
# #self.httpd_incl.append(file_path)
# #self.aug.add_transform(
# # "Httpd.lns", self.httpd_incl, None, self.httpd_excl)
# self.__add_httpd_transform(file_path)
# self.aug.load()
# Helpful reference?
# def verify_setup(self):
# """
# Make sure that files/directories are setup with appropriate
# permissions. Aim for defensive coding... make sure all input files
# have permissions of root
# """
# le_util.make_or_verify_dir(CONFIG.CONFIG_DIR, 0o755)
# le_util.make_or_verify_dir(CONFIG.WORK_DIR, 0o755)
# le_util.make_or_verify_dir(CONFIG.BACKUP_DIR, 0o755)
def restart(self, quiet=False):
"""Restarts nginx server"""
# May be of use?
# def __add_httpd_transform(self, incl):
# """
# This function will correctly add a transform to augeas
# The existing augeas.add_transform in python is broken
# """
# lastInclude = self.aug.match("/augeas/load/Httpd/incl [last()]")
# self.aug.insert(lastInclude[0], "incl", False)
# self.aug.set("/augeas/load/Httpd/incl[last()]", incl)
def config_test(self):
"""Check Configuration"""
return False
def main():
return
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,110 @@
"""An nginx config parser based on pyparsing."""
import string
from pyparsing import (
Literal, White, Word, alphanums, CharsNotIn, Forward, Group,
Optional, OneOrMore, ZeroOrMore, pythonStyleComment)
class NginxParser(object):
"""
A class that parses nginx configuration with pyparsing
"""
# constants
left_bracket = Literal("{").suppress()
right_bracket = Literal("}").suppress()
semicolon = Literal(";").suppress()
space = White().suppress()
key = Word(alphanums + "_/")
value = CharsNotIn("{};,")
location = CharsNotIn("{};," + string.whitespace)
# modifier for location uri [ = | ~ | ~* | ^~ ]
modifier = Literal("=") | Literal("~*") | Literal("~") | Literal("^~")
# rules
assignment = (key + Optional(space + value) + semicolon)
block = Forward()
block << Group(
Group(key + Optional(space + modifier) + Optional(space + location))
+ left_bracket
+ Group(ZeroOrMore(Group(assignment) | block))
+ right_bracket)
script = OneOrMore(Group(assignment) | block).ignore(pythonStyleComment)
def __init__(self, source):
self.source = source
def parse(self):
"""
Returns the parsed tree.
"""
return self.script.parseString(self.source)
def as_list(self):
"""
Returns the list of tree.
"""
return self.parse().asList()
class NginxDumper(object):
"""
A class that dumps nginx configuration from the provided tree.
"""
def __init__(self, blocks, indentation=4):
self.blocks = blocks
self.indentation = indentation
def __iter__(self, blocks=None, current_indent=0, spacer=' '):
"""
Iterates the dumped nginx content.
"""
blocks = blocks or self.blocks
for key, values in blocks:
if current_indent:
yield spacer
indentation = spacer * current_indent
if isinstance(key, list):
yield indentation + spacer.join(key) + ' {'
for parameter in values:
if isinstance(parameter[0], list):
dumped = self.__iter__(
[parameter],
current_indent + self.indentation)
for line in dumped:
yield line
else:
dumped = spacer.join(parameter) + ';'
yield spacer * (
current_indent + self.indentation) + dumped
yield indentation + '}'
else:
yield spacer * current_indent + key + spacer + values + ';'
def as_string(self):
return '\n'.join(self)
# Shortcut functions to respect Python's serialization interface
# (like pyyaml, picker or json)
def loads(source):
return NginxParser(source).as_list()
def load(_file):
return loads(_file.read())
def dumps(blocks, indentation=4):
return NginxDumper(blocks, indentation).as_string()
def dump(blocks, _file, indentation=4):
_file.write(dumps(blocks, indentation))
_file.close()
return _file

View File

@@ -0,0 +1,91 @@
"""Module contains classes used by the Nginx Configurator."""
class Addr(object):
r"""Represents an Nginx VirtualHost address.
:param str addr: addr part of vhost address
:param str port: port number or \*, or ""
"""
def __init__(self, tup):
self.tup = tup
@classmethod
def fromstring(cls, str_addr):
"""Initialize Addr from string."""
tup = str_addr.partition(':')
return cls((tup[0], tup[2]))
def __str__(self):
if self.tup[1]:
return "%s:%s" % self.tup
return self.tup[0]
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.tup == other.tup
return False
def __hash__(self):
return hash(self.tup)
def get_addr(self):
"""Return addr part of Addr object."""
return self.tup[0]
def get_port(self):
"""Return port."""
return self.tup[1]
def get_addr_obj(self, port):
"""Return new address object with same addr and new port."""
return self.__class__((self.tup[0], port))
class VirtualHost(object): # pylint: disable=too-few-public-methods
"""Represents an Nginx Virtualhost.
:ivar str filep: file path of VH
:ivar str path: Augeas path to virtual host
:ivar set addrs: Virtual Host addresses (:class:`set` of :class:`Addr`)
:ivar set names: Server names/aliases of vhost
(:class:`list` of :class:`str`)
:ivar bool ssl: SSLEngine on in vhost
:ivar bool enabled: Virtual host is enabled
"""
def __init__(self, filep, path, addrs, ssl, enabled, names=None):
# pylint: disable=too-many-arguments
"""Initialize a VH."""
self.filep = filep
self.path = path
self.addrs = addrs
self.names = set() if names is None else set(names)
self.ssl = ssl
self.enabled = enabled
def add_name(self, name):
"""Add name to vhost."""
self.names.add(name)
def __str__(self):
addr_str = ", ".join(str(addr) for addr in self.addrs)
return ("file: %s\n"
"vh_path: %s\n"
"addrs: %s\n"
"names: %s\n"
"ssl: %s\n"
"enabled: %s" % (self.filep, self.path, addr_str,
self.names, self.ssl, self.enabled))
def __eq__(self, other):
if isinstance(other, self.__class__):
return (self.filep == other.filep and self.path == other.path and
self.addrs == other.addrs and
self.names == other.names and
self.ssl == other.ssl and self.enabled == other.enabled)
return False

View File

@@ -0,0 +1,27 @@
ssl_session_cache shared:SSL:1m; # 1MB is ~4000 sessions, if it fills old sessions are dropped
ssl_session_timeout 1440m; # Reuse sessions for 24hrs
# Redirect all traffic to SSL
server {
listen 80 default;
server_name www.example.com example.com;
access_log off;
error_log off;
return 301 https://example.com$request_uri;
}
server {
listen 443 ssl default_server;
server_name example.com;
ssl_certificate /path/to/bundle.crt;
ssl_certificate_key /path/to/private.key;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_prefer_server_ciphers on;
# Using list of ciphers from "Bulletproof SSL and TLS"
ssl_ciphers "ECDHE-ECDSA-AES128-GCM-SHA256 ECDHE-ECDSA-AES256-GCM-SHA384 ECDHE-ECDSA-AES128-SHA ECDHE-ECDSA-AES256-SHA ECDHE-ECDSA-AES128-SHA256 ECDHE-ECDSA-AES256-SHA384 ECDHE-RSA-AES128-GCM-SHA256 ECDHE-RSA-AES256-GCM-SHA384 ECDHE-RSA-AES128-SHA ECDHE-RSA-AES128-SHA256 ECDHE-RSA-AES256-SHA384 DHE-RSA-AES128-GCM-SHA256 DHE-RSA-AES256-GCM-SHA384 DHE-RSA-AES128-SHA DHE-RSA-AES256-SHA DHE-RSA-AES128-SHA256 DHE-RSA-AES256-SHA256 EDH-RSA-DES-CBC3-SHA";
# Normal stuff below here
}

View File

@@ -0,0 +1,413 @@
"""NginxParser is a member object of the NginxConfigurator class."""
import os
import re
from letsencrypt.client import errors
class NginxParser(object):
"""Class handles the fine details of parsing the Nginx Configuration.
:ivar str root: Normalized abosulte path to the server root
directory. Without trailing slash.
"""
def __init__(self, aug, root, ssl_options):
# Find configuration root and make sure augeas can parse it.
self.aug = aug
self.root = os.path.abspath(root)
self.loc = self._set_locations(ssl_options)
self._parse_file(self.loc["root"])
# Must also attempt to parse sites-available or equivalent
# Sites-available is not included naturally in configuration
self._parse_file(os.path.join(self.root, "sites-available") + "/*")
# This problem has been fixed in Augeas 1.0
self.standardize_excl()
def add_dir_to_ifmodssl(self, aug_conf_path, directive, val):
"""Adds directive and value to IfMod ssl block.
Adds given directive and value along configuration path within
an IfMod mod_ssl.c block. If the IfMod block does not exist in
the file, it is created.
:param str aug_conf_path: Desired Augeas config path to add directive
:param str directive: Directive you would like to add
:param str val: Value of directive ie. Listen 443, 443 is the value
"""
# TODO: Add error checking code... does the path given even exist?
# Does it throw exceptions?
if_mod_path = self._get_ifmod(aug_conf_path, "mod_ssl.c")
# IfModule can have only one valid argument, so append after
self.aug.insert(if_mod_path + "arg", "directive", False)
nvh_path = if_mod_path + "directive[1]"
self.aug.set(nvh_path, directive)
self.aug.set(nvh_path + "/arg", val)
def _get_ifmod(self, aug_conf_path, mod):
"""Returns the path to <IfMod mod> and creates one if it doesn't exist.
:param str aug_conf_path: Augeas configuration path
:param str mod: module ie. mod_ssl.c
"""
if_mods = self.aug.match(("%s/IfModule/*[self::arg='%s']" %
(aug_conf_path, mod)))
if len(if_mods) == 0:
self.aug.set("%s/IfModule[last() + 1]" % aug_conf_path, "")
self.aug.set("%s/IfModule[last()]/arg" % aug_conf_path, mod)
if_mods = self.aug.match(("%s/IfModule/*[self::arg='%s']" %
(aug_conf_path, mod)))
# Strip off "arg" at end of first ifmod path
return if_mods[0][:len(if_mods[0]) - 3]
def add_dir(self, aug_conf_path, directive, arg):
"""Appends directive to the end fo the file given by aug_conf_path.
.. note:: Not added to AugeasConfigurator because it may depend
on the lens
:param str aug_conf_path: Augeas configuration path to add directive
:param str directive: Directive to add
:param str arg: Value of the directive. ie. Listen 443, 443 is arg
"""
self.aug.set(aug_conf_path + "/directive[last() + 1]", directive)
if isinstance(arg, list):
for i, value in enumerate(arg, 1):
self.aug.set(
"%s/directive[last()]/arg[%d]" % (aug_conf_path, i), value)
else:
self.aug.set(aug_conf_path + "/directive[last()]/arg", arg)
def find_dir(self, directive, arg=None, start=None):
"""Finds directive in the configuration.
Recursively searches through config files to find directives
Directives should be in the form of a case insensitive regex currently
.. todo:: Add order to directives returned. Last directive comes last..
.. todo:: arg should probably be a list
Note: Augeas is inherently case sensitive while Nginx is case
insensitive. Augeas 1.0 allows case insensitive regexes like
regexp(/Listen/, "i"), however the version currently supported
by Ubuntu 0.10 does not. Thus I have included my own case insensitive
transformation by calling case_i() on everything to maintain
compatibility.
:param str directive: Directive to look for
:param arg: Specific value directive must have, None if all should
be considered
:type arg: str or None
:param str start: Beginning Augeas path to begin looking
"""
# Cannot place member variable in the definition of the function so...
if not start:
start = get_aug_path(self.loc["root"])
# Debug code
# print "find_dir:", directive, "arg:", arg, " | Looking in:", start
# No regexp code
# if arg is None:
# matches = self.aug.match(start +
# "//*[self::directive='" + directive + "']/arg")
# else:
# matches = self.aug.match(start +
# "//*[self::directive='" + directive +
# "']/* [self::arg='" + arg + "']")
# includes = self.aug.match(start +
# "//* [self::directive='Include']/* [label()='arg']")
if arg is None:
matches = self.aug.match(("%s//*[self::directive=~regexp('%s')]/arg"
% (start, directive)))
else:
matches = self.aug.match(("%s//*[self::directive=~regexp('%s')]/*"
"[self::arg=~regexp('%s')]" %
(start, directive, arg)))
incl_regex = "(%s)|(%s)" % (case_i('Include'),
case_i('IncludeOptional'))
includes = self.aug.match(("%s//* [self::directive=~regexp('%s')]/* "
"[label()='arg']" % (start, incl_regex)))
# for inc in includes:
# print inc, self.aug.get(inc)
for include in includes:
# start[6:] to strip off /files
matches.extend(self.find_dir(
directive, arg, self._get_include_path(
strip_dir(start[6:]), self.aug.get(include))))
return matches
def _get_include_path(self, cur_dir, arg):
"""Converts an Nginx Include directive into Augeas path.
Converts an Nginx Include directive argument into an Augeas
searchable path
.. todo:: convert to use os.path.join()
:param str cur_dir: current working directory
:param str arg: Argument of Include directive
:returns: Augeas path string
:rtype: str
"""
# Sanity check argument - maybe
# Question: what can the attacker do with control over this string
# Effect parse file... maybe exploit unknown errors in Augeas
# If the attacker can Include anything though... and this function
# only operates on Nginx real config data... then the attacker has
# already won.
# Perhaps it is better to simply check the permissions on all
# included files?
# check_config to validate nginx config doesn't work because it
# would create a race condition between the check and this input
# TODO: Maybe... although I am convinced we have lost if
# Nginx files can't be trusted. The augeas include path
# should be made to be exact.
# Check to make sure only expected characters are used <- maybe remove
# validChars = re.compile("[a-zA-Z0-9.*?_-/]*")
# matchObj = validChars.match(arg)
# if matchObj.group() != arg:
# logging.error("Error: Invalid regexp characters in %s", arg)
# return []
# Standardize the include argument based on server root
if not arg.startswith("/"):
arg = cur_dir + arg
# conf/ is a special variable for ServerRoot in Nginx
elif arg.startswith("conf/"):
arg = self.root + arg[4:]
# TODO: Test if Nginx allows ../ or ~/ for Includes
# Attempts to add a transform to the file if one does not already exist
self._parse_file(arg)
# Argument represents an fnmatch regular expression, convert it
# Split up the path and convert each into an Augeas accepted regex
# then reassemble
if "*" in arg or "?" in arg:
split_arg = arg.split("/")
for idx, split in enumerate(split_arg):
# * and ? are the two special fnmatch characters
if "*" in split or "?" in split:
# Turn it into a augeas regex
# TODO: Can this instead be an augeas glob instead of regex
split_arg[idx] = ("* [label()=~regexp('%s')]" %
self.fnmatch_to_re(split))
# Reassemble the argument
arg = "/".join(split_arg)
# If the include is a directory, just return the directory as a file
if arg.endswith("/"):
return get_aug_path(arg[:len(arg)-1])
return get_aug_path(arg)
def fnmatch_to_re(self, clean_fn_match): # pylint: disable=no-self-use
"""Method converts Nginx's basic fnmatch to regular expression.
:param str clean_fn_match: Nginx style filename match, similar to globs
:returns: regex suitable for augeas
:rtype: str
"""
# Checkout fnmatch.py in venv/local/lib/python2.7/fnmatch.py
regex = ""
for letter in clean_fn_match:
if letter == '.':
regex = regex + r"\."
elif letter == '*':
regex = regex + ".*"
# According to nginx.org ? shouldn't appear
# but in case it is valid...
elif letter == '?':
regex = regex + "."
else:
regex = regex + letter
return regex
def _parse_file(self, filepath):
"""Parse file with Augeas
Checks to see if file_path is parsed by Augeas
If filepath isn't parsed, the file is added and Augeas is reloaded
:param str filepath: Nginx config file path
"""
# Test if augeas included file for Httpd.lens
# Note: This works for augeas globs, ie. *.conf
inc_test = self.aug.match(
"/augeas/load/Httpd/incl [. ='%s']" % filepath)
if not inc_test:
# Load up files
# This doesn't seem to work on TravisCI
# self.aug.add_transform("Httpd.lns", [filepath])
self._add_httpd_transform(filepath)
self.aug.load()
def _add_httpd_transform(self, incl):
"""Add a transform to Augeas.
This function will correctly add a transform to augeas
The existing augeas.add_transform in python doesn't seem to work for
Travis CI as it loads in libaugeas.so.0.10.0
:param str incl: filepath to include for transform
"""
last_include = self.aug.match("/augeas/load/Httpd/incl [last()]")
if last_include:
# Insert a new node immediately after the last incl
self.aug.insert(last_include[0], "incl", False)
self.aug.set("/augeas/load/Httpd/incl[last()]", incl)
# On first use... must load lens and add file to incl
else:
# Augeas uses base 1 indexing... insert at beginning...
self.aug.set("/augeas/load/Httpd/lens", "Httpd.lns")
self.aug.set("/augeas/load/Httpd/incl", incl)
def standardize_excl(self):
"""Standardize the excl arguments for the Httpd lens in Augeas.
Note: Hack!
Standardize the excl arguments for the Httpd lens in Augeas
Servers sometimes give incorrect defaults
Note: This problem should be fixed in Augeas 1.0. Unfortunately,
Augeas 0.10 appears to be the most popular version currently.
"""
# attempt to protect against augeas error in 0.10.0 - ubuntu
# *.augsave -> /*.augsave upon augeas.load()
# Try to avoid bad httpd files
# There has to be a better way... but after a day and a half of testing
# I had no luck
# This is a hack... work around... submit to augeas if still not fixed
excl = ["*.augnew", "*.augsave", "*.dpkg-dist", "*.dpkg-bak",
"*.dpkg-new", "*.dpkg-old", "*.rpmsave", "*.rpmnew",
"*~",
self.root + "/*.augsave",
self.root + "/*~",
self.root + "/*/*augsave",
self.root + "/*/*~",
self.root + "/*/*/*.augsave",
self.root + "/*/*/*~"]
for i, excluded in enumerate(excl, 1):
self.aug.set("/augeas/load/Httpd/excl[%d]" % i, excluded)
self.aug.load()
def _set_locations(self, ssl_options):
"""Set default location for directives.
Locations are given as file_paths
.. todo:: Make sure that files are included
"""
root = self._find_config_root()
default = self._set_user_config_file(root)
temp = os.path.join(self.root, "ports.conf")
if os.path.isfile(temp):
listen = temp
name = temp
else:
listen = default
name = default
return {"root": root, "default": default, "listen": listen,
"name": name, "ssl_options": ssl_options}
def _find_config_root(self):
"""Find the Nginx Configuration Root file."""
location = ["nginx2.conf", "httpd.conf"]
for name in location:
if os.path.isfile(os.path.join(self.root, name)):
return os.path.join(self.root, name)
raise errors.LetsEncryptNoInstallationError(
"Could not find configuration root")
def _set_user_config_file(self, root):
"""Set the appropriate user configuration file
.. todo:: This will have to be updated for other distros versions
:param str root: pathname which contains the user config
"""
# Basic check to see if httpd.conf exists and
# in hierarchy via direct include
# httpd.conf was very common as a user file in Nginx 2.2
if (os.path.isfile(os.path.join(self.root, 'httpd.conf')) and
self.find_dir(
case_i("Include"), case_i("httpd.conf"), root)):
return os.path.join(self.root, 'httpd.conf')
else:
return os.path.join(self.root, 'nginx2.conf')
def case_i(string):
"""Returns case insensitive regex.
Returns a sloppy, but necessary version of a case insensitive regex.
Any string should be able to be submitted and the string is
escaped and then made case insensitive.
May be replaced by a more proper /i once augeas 1.0 is widely
supported.
:param str string: string to make case i regex
"""
return "".join(["["+c.upper()+c.lower()+"]"
if c.isalpha() else c for c in re.escape(string)])
def get_aug_path(file_path):
"""Return augeas path for full filepath.
:param str file_path: Full filepath
"""
return "/files%s" % file_path
def strip_dir(path):
"""Returns directory of file path.
.. todo:: Replace this with Python standard function
:param str path: path is a file path. not an augeas section or
directive path
:returns: directory
:rtype: str
"""
index = path.rfind("/")
if index > 0:
return path[:index+1]
# No directory
return ""

View File

@@ -0,0 +1 @@
"""Let's Encrypt Nginx Tests"""

View File

@@ -0,0 +1,196 @@
"""Test for letsencrypt.client.nginx.configurator."""
import os
import re
import shutil
import unittest
import mock
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client.nginx import configurator
from letsencrypt.client.nginx import obj
from letsencrypt.client.nginx import parser
from letsencrypt.client.tests.nginx import util
class TwoVhost80Test(util.NginxTest):
"""Test two standard well configured HTTP vhosts."""
def setUp(self):
super(TwoVhost80Test, self).setUp()
with mock.patch("letsencrypt.client.nginx.configurator."
"mod_loaded") as mock_load:
mock_load.return_value = True
self.config = util.get_nginx_configurator(
self.config_path, self.config_dir, self.work_dir,
self.ssl_options)
self.vh_truth = util.get_vh_truth(
self.temp_dir, "debian_nginx_2_4/two_vhost_80")
def tearDown(self):
shutil.rmtree(self.temp_dir)
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
def test_get_all_names(self):
names = self.config.get_all_names()
self.assertEqual(names, set(
['letsencrypt.demo', 'encryption-example.demo', 'ip-172-30-0-17']))
def test_get_virtual_hosts(self):
vhs = self.config.get_virtual_hosts()
self.assertEqual(len(vhs), 4)
found = 0
for vhost in vhs:
for truth in self.vh_truth:
if vhost == truth:
found += 1
break
self.assertEqual(found, 4)
def test_is_site_enabled(self):
self.assertTrue(self.config.is_site_enabled(self.vh_truth[0].filep))
self.assertFalse(self.config.is_site_enabled(self.vh_truth[1].filep))
self.assertTrue(self.config.is_site_enabled(self.vh_truth[2].filep))
self.assertTrue(self.config.is_site_enabled(self.vh_truth[3].filep))
def test_deploy_cert(self):
# Get the default 443 vhost
self.config.assoc["random.demo"] = self.vh_truth[1]
self.config.deploy_cert(
"random.demo",
"example/cert.pem", "example/key.pem", "example/cert_chain.pem")
self.config.save()
loc_cert = self.config.parser.find_dir(
parser.case_i("sslcertificatefile"),
re.escape("example/cert.pem"), self.vh_truth[1].path)
loc_key = self.config.parser.find_dir(
parser.case_i("sslcertificateKeyfile"),
re.escape("example/key.pem"), self.vh_truth[1].path)
loc_chain = self.config.parser.find_dir(
parser.case_i("SSLCertificateChainFile"),
re.escape("example/cert_chain.pem"), self.vh_truth[1].path)
# Verify one directive was found in the correct file
self.assertEqual(len(loc_cert), 1)
self.assertEqual(configurator.get_file_path(loc_cert[0]),
self.vh_truth[1].filep)
self.assertEqual(len(loc_key), 1)
self.assertEqual(configurator.get_file_path(loc_key[0]),
self.vh_truth[1].filep)
self.assertEqual(len(loc_chain), 1)
self.assertEqual(configurator.get_file_path(loc_chain[0]),
self.vh_truth[1].filep)
def test_is_name_vhost(self):
addr = obj.Addr.fromstring("*:80")
self.assertTrue(self.config.is_name_vhost(addr))
self.config.version = (2, 2)
self.assertFalse(self.config.is_name_vhost(addr))
def test_add_name_vhost(self):
self.config.add_name_vhost("*:443")
self.assertTrue(self.config.parser.find_dir(
"NameVirtualHost", re.escape("*:443")))
def test_make_vhost_ssl(self):
ssl_vhost = self.config.make_vhost_ssl(self.vh_truth[0])
self.assertEqual(
ssl_vhost.filep,
os.path.join(self.config_path, "sites-available",
"encryption-example-le-ssl.conf"))
self.assertEqual(ssl_vhost.path,
"/files" + ssl_vhost.filep + "/IfModule/VirtualHost")
self.assertEqual(len(ssl_vhost.addrs), 1)
self.assertEqual(set([obj.Addr.fromstring("*:443")]), ssl_vhost.addrs)
self.assertEqual(ssl_vhost.names, set(["encryption-example.demo"]))
self.assertTrue(ssl_vhost.ssl)
self.assertFalse(ssl_vhost.enabled)
self.assertTrue(self.config.parser.find_dir(
"SSLCertificateFile", None, ssl_vhost.path))
self.assertTrue(self.config.parser.find_dir(
"SSLCertificateKeyFile", None, ssl_vhost.path))
self.assertTrue(self.config.parser.find_dir(
"Include", self.ssl_options, ssl_vhost.path))
self.assertEqual(self.config.is_name_vhost(self.vh_truth[0]),
self.config.is_name_vhost(ssl_vhost))
self.assertEqual(len(self.config.vhosts), 5)
@mock.patch("letsencrypt.client.nginx.configurator."
"dvsni.NginxDvsni.perform")
@mock.patch("letsencrypt.client.nginx.configurator."
"NginxConfigurator.restart")
def test_perform(self, mock_restart, mock_dvsni_perform):
# Only tests functionality specific to configurator.perform
# Note: As more challenges are offered this will have to be expanded
auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
achall1 = achallenges.DVSNI(
chall=challenges.DVSNI(
r="jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q",
nonce="37bc5eb75d3e00a19b4f6355845e5a18"),
domain="encryption-example.demo", key=auth_key)
achall2 = achallenges.DVSNI(
chall=challenges.DVSNI(
r="uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU",
nonce="59ed014cac95f77057b1d7a1b2c596ba"),
domain="letsencrypt.demo", key=auth_key)
dvsni_ret_val = [
challenges.DVSNIResponse(s="randomS1"),
challenges.DVSNIResponse(s="randomS2"),
]
mock_dvsni_perform.return_value = dvsni_ret_val
responses = self.config.perform([achall1, achall2])
self.assertEqual(mock_dvsni_perform.call_count, 1)
self.assertEqual(responses, dvsni_ret_val)
self.assertEqual(mock_restart.call_count, 1)
@mock.patch("letsencrypt.client.nginx.configurator."
"subprocess.Popen")
def test_get_version(self, mock_popen):
mock_popen().communicate.return_value = (
"Server Version: Nginx/2.4.2 (Debian)", "")
self.assertEqual(self.config.get_version(), (2, 4, 2))
mock_popen().communicate.return_value = (
"Server Version: Nginx/2 (Linux)", "")
self.assertEqual(self.config.get_version(), (2,))
mock_popen().communicate.return_value = (
"Server Version: Nginx (Debian)", "")
self.assertRaises(
errors.LetsEncryptConfiguratorError, self.config.get_version)
mock_popen().communicate.return_value = (
"Server Version: Nginx/2.3\n Nginx/2.4.7", "")
self.assertRaises(
errors.LetsEncryptConfiguratorError, self.config.get_version)
mock_popen.side_effect = OSError("Can't find program")
self.assertRaises(
errors.LetsEncryptConfiguratorError, self.config.get_version)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,170 @@
"""Test for letsencrypt.client.nginx.dvsni."""
import pkg_resources
import unittest
import shutil
import mock
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import le_util
from letsencrypt.client.nginx.obj import Addr
from letsencrypt.client.tests.nginx import util
class DvsniPerformTest(util.NginxTest):
"""Test the NginxDVSNI challenge."""
def setUp(self):
super(DvsniPerformTest, self).setUp()
with mock.patch("letsencrypt.client.nginx.configurator."
"mod_loaded") as mock_load:
mock_load.return_value = True
config = util.get_nginx_configurator(
self.config_path, self.config_dir, self.work_dir,
self.ssl_options)
from letsencrypt.client.nginx import dvsni
self.sni = dvsni.NginxDvsni(config)
rsa256_file = pkg_resources.resource_filename(
"letsencrypt.client.tests", 'testdata/rsa256_key.pem')
rsa256_pem = pkg_resources.resource_string(
"letsencrypt.client.tests", 'testdata/rsa256_key.pem')
auth_key = le_util.Key(rsa256_file, rsa256_pem)
self.achalls = [
achallenges.DVSNI(
chall=challenges.DVSNI(
r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9\xf1"
"\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4",
nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18",
), domain="encryption-example.demo", key=auth_key),
achallenges.DVSNI(
chall=challenges.DVSNI(
r="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y\x80"
"\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945",
nonce="Y\xed\x01L\xac\x95\xf7pW\xb1\xd7"
"\xa1\xb2\xc5\x96\xba",
), domain="letsencrypt.demo", key=auth_key),
]
def tearDown(self):
shutil.rmtree(self.temp_dir)
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
def test_perform0(self):
resp = self.sni.perform()
self.assertEqual(len(resp), 0)
def test_setup_challenge_cert(self):
# This is a helper function that can be used for handling
# open context managers more elegantly. It avoids dealing with
# __enter__ and __exit__ calls.
# http://www.voidspace.org.uk/python/mock/helpers.html#mock.mock_open
m_open = mock.mock_open()
response = challenges.DVSNIResponse(s="randomS1")
achall = mock.MagicMock(nonce=self.achalls[0].nonce,
nonce_domain=self.achalls[0].nonce_domain)
achall.gen_cert_and_response.return_value = ("pem", response)
with mock.patch("letsencrypt.client.nginx.dvsni.open",
m_open, create=True):
# pylint: disable=protected-access
self.assertEqual(response, self.sni._setup_challenge_cert(
achall, "randomS1"))
self.assertTrue(m_open.called)
self.assertEqual(
m_open.call_args[0], (self.sni.get_cert_file(achall), 'w'))
self.assertEqual(m_open().write.call_args[0][0], "pem")
def test_perform1(self):
achall = self.achalls[0]
self.sni.add_chall(achall)
mock_setup_cert = mock.MagicMock(
return_value=challenges.DVSNIResponse(s="randomS1"))
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
responses = self.sni.perform()
mock_setup_cert.assert_called_once_with(achall)
# Check to make sure challenge config path is included in nginx config.
self.assertEqual(
len(self.sni.configurator.parser.find_dir(
"Include", self.sni.challenge_conf)),
1)
self.assertEqual(len(responses), 1)
self.assertEqual(responses[0].s, "randomS1")
def test_perform2(self):
for achall in self.achalls:
self.sni.add_chall(achall)
mock_setup_cert = mock.MagicMock(side_effect=[
challenges.DVSNIResponse(s="randomS0"),
challenges.DVSNIResponse(s="randomS1")])
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
responses = self.sni.perform()
self.assertEqual(mock_setup_cert.call_count, 2)
# Make sure calls made to mocked function were correct
self.assertEqual(
mock_setup_cert.call_args_list[0], mock.call(self.achalls[0]))
self.assertEqual(
mock_setup_cert.call_args_list[1], mock.call(self.achalls[1]))
self.assertEqual(
len(self.sni.configurator.parser.find_dir(
"Include", self.sni.challenge_conf)),
1)
self.assertEqual(len(responses), 2)
for i in xrange(2):
self.assertEqual(responses[i].s, "randomS%d" % i)
def test_mod_config(self):
for achall in self.achalls:
self.sni.add_chall(achall)
v_addr1 = [Addr(("1.2.3.4", "443")), Addr(("5.6.7.8", "443"))]
v_addr2 = [Addr(("127.0.0.1", "443"))]
ll_addr = []
ll_addr.append(v_addr1)
ll_addr.append(v_addr2)
self.sni._mod_config(ll_addr) # pylint: disable=protected-access
self.sni.configurator.save()
self.sni.configurator.parser.find_dir(
"Include", self.sni.challenge_conf)
vh_match = self.sni.configurator.aug.match(
"/files" + self.sni.challenge_conf + "//VirtualHost")
vhs = []
for match in vh_match:
# pylint: disable=protected-access
vhs.append(self.sni.configurator._create_vhost(match))
self.assertEqual(len(vhs), 2)
for vhost in vhs:
if vhost.addrs == set(v_addr1):
self.assertEqual(
vhost.names,
set([self.achalls[0].nonce_domain]))
else:
self.assertEqual(vhost.addrs, set(v_addr2))
self.assertEqual(
vhost.names,
set([self.achalls[1].nonce_domain]))
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,101 @@
import operator
import unittest
from nginxparser import NginxParser, load, dumps, dump
first = operator.itemgetter(0)
class TestNginxParser(unittest.TestCase):
def test_assignments(self):
parsed = NginxParser.assignment.parseString('root /test;').asList()
self.assertEqual(parsed, ['root', '/test'])
parsed = NginxParser.assignment.parseString('root /test;'
'foo bar;').asList()
self.assertEqual(parsed, ['root', '/test'], ['foo', 'bar'])
def test_blocks(self):
parsed = NginxParser.block.parseString('foo {}').asList()
self.assertEqual(parsed, [[['foo'], []]])
parsed = NginxParser.block.parseString('location /foo{}').asList()
self.assertEqual(parsed, [[['location', '/foo'], []]])
parsed = NginxParser.block.parseString('foo { bar foo; }').asList()
self.assertEqual(parsed, [[['foo'], [['bar', 'foo']]]])
def test_nested_blocks(self):
parsed = NginxParser.block.parseString('foo { bar {} }').asList()
block, content = first(parsed)
self.assertEqual(first(content), [['bar'], []])
def test_dump_as_string(self):
dumped = dumps([
['user', 'www-data'],
[['server'], [
['listen', '80'],
['server_name', 'foo.com'],
['root', '/home/ubuntu/sites/foo/'],
[['location', '/status'], [
['check_status'],
[['types'], [['image/jpeg', 'jpg']]],
]]
]]])
self.assertEqual(dumped,
'user www-data;\n' +
'server {\n' +
' listen 80;\n' +
' server_name foo.com;\n' +
' root /home/ubuntu/sites/foo/;\n \n' +
' location /status {\n' +
' check_status;\n \n' +
' types {\n' +
' image/jpeg jpg;\n' +
' }\n' +
' }\n' +
'}')
def test_parse_from_file(self):
parsed = load(open('data/foo.conf'))
self.assertEqual(
parsed,
[['user', 'www-data'],
[['server'], [
['listen', '80'],
['server_name', 'foo.com'],
['root', '/home/ubuntu/sites/foo/'],
[['location', '/status'], [
['check_status'],
[['types'], [['image/jpeg', 'jpg']]],
]],
[['location', '~', 'case_sensitive\.php$'], [
['hoge', 'hoge']
]],
[['location', '~*', 'case_insensitive\.php$'], []],
[['location', '=', 'exact_match\.php$'], []],
[['location', '^~', 'ignore_regex\.php$'], []],
]]]
)
def test_dump_as_file(self):
parsed = load(open('data/nginx.conf'))
parsed[-1][-1].append([['server'],
[['listen', '443 ssl'],
['server_name', 'localhost'],
['ssl_certificate', 'cert.pem'],
['ssl_certificate_key', 'cert.key'],
['ssl_session_cache', 'shared:SSL:1m'],
['ssl_session_timeout', '5m'],
['ssl_ciphers', 'HIGH:!aNULL:!MD5'],
[['location', '/'],
[['root', 'html'],
['index', 'index.html index.htm']]]]])
f = open('data/nginx.new.conf', 'w')
dump(parsed, f)
parsed_new = load(open('data/nginx.new.conf'))
self.assertEquals(parsed, parsed_new)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,68 @@
"""Test the helper objects in nginx.obj.py."""
import unittest
class AddrTest(unittest.TestCase):
"""Test the Addr class."""
def setUp(self):
from letsencrypt.client.nginx.obj import Addr
self.addr1 = Addr.fromstring("192.168.1.1")
self.addr2 = Addr.fromstring("192.168.1.1:*")
self.addr3 = Addr.fromstring("192.168.1.1:80")
def test_fromstring(self):
self.assertEqual(self.addr1.get_addr(), "192.168.1.1")
self.assertEqual(self.addr1.get_port(), "")
self.assertEqual(self.addr2.get_addr(), "192.168.1.1")
self.assertEqual(self.addr2.get_port(), "*")
self.assertEqual(self.addr3.get_addr(), "192.168.1.1")
self.assertEqual(self.addr3.get_port(), "80")
def test_str(self):
self.assertEqual(str(self.addr1), "192.168.1.1")
self.assertEqual(str(self.addr2), "192.168.1.1:*")
self.assertEqual(str(self.addr3), "192.168.1.1:80")
def test_get_addr_obj(self):
self.assertEqual(str(self.addr1.get_addr_obj("443")), "192.168.1.1:443")
self.assertEqual(str(self.addr2.get_addr_obj("")), "192.168.1.1")
self.assertEqual(str(self.addr1.get_addr_obj("*")), "192.168.1.1:*")
def test_eq(self):
self.assertEqual(self.addr1, self.addr2.get_addr_obj(""))
self.assertNotEqual(self.addr1, self.addr2)
self.assertFalse(self.addr1 == 3333)
def test_set_inclusion(self):
from letsencrypt.client.nginx.obj import Addr
set_a = set([self.addr1, self.addr2])
addr1b = Addr.fromstring("192.168.1.1")
addr2b = Addr.fromstring("192.168.1.1:*")
set_b = set([addr1b, addr2b])
self.assertEqual(set_a, set_b)
class VirtualHostTest(unittest.TestCase):
"""Test the VirtualHost class."""
def setUp(self):
from letsencrypt.client.nginx.obj import VirtualHost
from letsencrypt.client.nginx.obj import Addr
self.vhost1 = VirtualHost(
"filep", "vh_path",
set([Addr.fromstring("localhost")]), False, False)
def test_eq(self):
from letsencrypt.client.nginx.obj import Addr
from letsencrypt.client.nginx.obj import VirtualHost
vhost1b = VirtualHost(
"filep", "vh_path",
set([Addr.fromstring("localhost")]), False, False)
self.assertEqual(vhost1b, self.vhost1)
self.assertEqual(str(vhost1b), str(self.vhost1))
self.assertFalse(vhost1b == 1234)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,129 @@
"""Tests the NginxParser class."""
import os
import shutil
import sys
import unittest
import augeas
import mock
import zope.component
from letsencrypt.client import errors
from letsencrypt.client.display import util as display_util
from letsencrypt.client.tests.nginx import util
class NginxParserTest(util.NginxTest):
"""Nginx Parser Test."""
def setUp(self):
super(NginxParserTest, self).setUp()
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
from letsencrypt.client.nginx.parser import NginxParser
self.aug = augeas.Augeas(flags=augeas.Augeas.NONE)
self.parser = NginxParser(self.aug, self.config_path, self.ssl_options)
def tearDown(self):
shutil.rmtree(self.temp_dir)
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
def test_root_normalized(self):
from letsencrypt.client.nginx.parser import NginxParser
path = os.path.join(self.temp_dir, "debian_nginx_2_4/////"
"two_vhost_80/../two_vhost_80/nginx2")
parser = NginxParser(self.aug, path, None)
self.assertEqual(parser.root, self.config_path)
def test_root_absolute(self):
from letsencrypt.client.nginx.parser import NginxParser
parser = NginxParser(self.aug, os.path.relpath(self.config_path), None)
self.assertEqual(parser.root, self.config_path)
def test_root_no_trailing_slash(self):
from letsencrypt.client.nginx.parser import NginxParser
parser = NginxParser(self.aug, self.config_path + os.path.sep, None)
self.assertEqual(parser.root, self.config_path)
def test_parse_file(self):
"""Test parse_file.
letsencrypt.conf is chosen as the test file as it will not be
included during the normal course of execution.
"""
file_path = os.path.join(
self.config_path, "sites-available", "letsencrypt.conf")
self.parser._parse_file(file_path) # pylint: disable=protected-access
# search for the httpd incl
matches = self.parser.aug.match(
"/augeas/load/Httpd/incl [. ='%s']" % file_path)
self.assertTrue(matches)
def test_find_dir(self):
from letsencrypt.client.nginx.parser import case_i
test = self.parser.find_dir(case_i("Listen"), "443")
# This will only look in enabled hosts
test2 = self.parser.find_dir(case_i("documentroot"))
self.assertEqual(len(test), 2)
self.assertEqual(len(test2), 3)
def test_add_dir(self):
aug_default = "/files" + self.parser.loc["default"]
self.parser.add_dir(aug_default, "AddDirective", "test")
self.assertTrue(
self.parser.find_dir("AddDirective", "test", aug_default))
self.parser.add_dir(aug_default, "AddList", ["1", "2", "3", "4"])
matches = self.parser.find_dir("AddList", None, aug_default)
for i, match in enumerate(matches):
self.assertEqual(self.parser.aug.get(match), str(i + 1))
def test_add_dir_to_ifmodssl(self):
"""test add_dir_to_ifmodssl.
Path must be valid before attempting to add to augeas
"""
from letsencrypt.client.nginx.parser import get_aug_path
self.parser.add_dir_to_ifmodssl(
get_aug_path(self.parser.loc["default"]),
"FakeDirective", "123")
matches = self.parser.find_dir("FakeDirective", "123")
self.assertEqual(len(matches), 1)
self.assertTrue("IfModule" in matches[0])
def test_get_aug_path(self):
from letsencrypt.client.nginx.parser import get_aug_path
self.assertEqual("/files/etc/nginx", get_aug_path("/etc/nginx"))
def test_set_locations(self):
with mock.patch("letsencrypt.client.nginx.parser."
"os.path") as mock_path:
mock_path.isfile.return_value = False
# pylint: disable=protected-access
self.assertRaises(errors.LetsEncryptConfiguratorError,
self.parser._set_locations, self.ssl_options)
mock_path.isfile.side_effect = [True, False, False]
# pylint: disable=protected-access
results = self.parser._set_locations(self.ssl_options)
self.assertEqual(results["default"], results["listen"])
self.assertEqual(results["default"], results["name"])
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,23 @@
# a test nginx conf
user www-data;
server {
listen 80;
server_name foo.com;
root /home/ubuntu/sites/foo/;
location /status {
check_status;
types {
image/jpeg jpg;
}
}
location ~ case_sensitive\.php$ {
hoge hoge;
}
location ~* case_insensitive\.php$ {}
location = exact_match\.php$ {}
location ^~ ignore_regex\.php$ {}
}

View File

@@ -0,0 +1,117 @@
# standard default nginx config
user nobody;
worker_processes 1;
error_log logs/error.log;
error_log logs/error.log notice;
error_log logs/error.log info;
pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log logs/access.log main;
sendfile on;
tcp_nopush on;
keepalive_timeout 0;
keepalive_timeout 65;
gzip on;
server {
listen 8080;
server_name localhost;
charset koi8-r;
access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
}
error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
# proxy the PHP scripts to Nginx listening on 127.0.0.1:80
#
location ~ \.php$ {
proxy_pass http://127.0.0.1;
}
# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
location ~ \.php$ {
root html;
fastcgi_pass 127.0.0.1:9000;
fastcgi_index index.php;
fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
include fastcgi_params;
}
# deny access to .htaccess files, if Nginx's document root
# concurs with nginx's one
#
location ~ /\.ht {
deny all;
}
}
# another virtual host using mix of IP-, name-, and port-based configuration
#
server {
listen 8000;
listen somename:8080;
server_name somename alias another.alias;
location / {
root html;
index index.html index.htm;
}
}
# HTTPS server
#
#server {
# listen 443 ssl;
# server_name localhost;
# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;
# ssl_session_cache shared:SSL:1m;
# ssl_session_timeout 5m;
# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;
# location / {
# root html;
# index index.html index.htm;
# }
#}
}

View File

@@ -0,0 +1,82 @@
user nobody;
worker_processes 1;
error_log logs/error.log;
error_log logs/error.log notice;
error_log logs/error.log info;
pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log logs/access.log main;
sendfile on;
tcp_nopush on;
keepalive_timeout 0;
keepalive_timeout 65;
gzip on;
server {
listen 8080;
server_name localhost;
charset koi8-r;
access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
}
error_page 404 /404.html;
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
location ~ \.php$ {
proxy_pass http://127.0.0.1;
}
location ~ \.php$ {
root html;
fastcgi_pass 127.0.0.1:9000;
fastcgi_index index.php;
fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
include fastcgi_params;
}
location ~ /\.ht {
deny all;
}
}
server {
listen 8000;
listen somename:8080;
server_name somename alias another.alias;
location / {
root html;
index index.html index.htm;
}
}
server {
listen 443 ssl;
server_name localhost;
ssl_certificate cert.pem;
ssl_certificate_key cert.key;
ssl_session_cache shared:SSL:1m;
ssl_session_timeout 5m;
ssl_ciphers HIGH:!aNULL:!MD5;
location / {
root html;
index index.html index.htm;
}
}
}

View File

@@ -0,0 +1,112 @@
"""Common utilities for letsencrypt.client.nginx."""
import os
import pkg_resources
import shutil
import tempfile
import unittest
import mock
from letsencrypt.client import constants
from letsencrypt.client.nginx import configurator
from letsencrypt.client.nginx import obj
class NginxTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def setUp(self):
super(NginxTest, self).setUp()
self.temp_dir, self.config_dir, self.work_dir = dir_setup(
"debian_nginx_2_4/two_vhost_80")
self.ssl_options = setup_nginx_ssl_options(self.config_dir)
self.config_path = os.path.join(
self.temp_dir, "debian_nginx_2_4/two_vhost_80/nginx2")
self.rsa256_file = pkg_resources.resource_filename(
"letsencrypt.client.tests", 'testdata/rsa256_key.pem')
self.rsa256_pem = pkg_resources.resource_string(
"letsencrypt.client.tests", 'testdata/rsa256_key.pem')
def dir_setup(test_dir="debian_nginx_2_4/two_vhost_80"):
"""Setup the directories necessary for the configurator."""
temp_dir = tempfile.mkdtemp("temp")
config_dir = tempfile.mkdtemp("config")
work_dir = tempfile.mkdtemp("work")
test_configs = pkg_resources.resource_filename(
"letsencrypt.client.tests", "testdata/%s" % test_dir)
shutil.copytree(
test_configs, os.path.join(temp_dir, test_dir), symlinks=True)
return temp_dir, config_dir, work_dir
def setup_nginx_ssl_options(config_dir):
"""Move the ssl_options into position and return the path."""
option_path = os.path.join(config_dir, "options-ssl.conf")
shutil.copyfile(constants.APACHE_MOD_SSL_CONF, option_path)
return option_path
def get_nginx_configurator(
config_path, config_dir, work_dir, ssl_options, version=(2, 4, 7)):
"""Create an Nginx Configurator with the specified options."""
backups = os.path.join(work_dir, "backups")
with mock.patch("letsencrypt.client.nginx.configurator."
"subprocess.Popen") as mock_popen:
# This just states that the ssl module is already loaded
mock_popen().communicate.return_value = ("ssl_module", "")
config = configurator.NginxConfigurator(
mock.MagicMock(
nginx_server_root=config_path,
nginx_mod_ssl_conf=ssl_options,
le_vhost_ext="-le-ssl.conf",
backup_dir=backups,
config_dir=config_dir,
temp_checkpoint_dir=os.path.join(work_dir, "temp_checkpoints"),
in_progress_dir=os.path.join(backups, "IN_PROGRESS"),
work_dir=work_dir),
version)
config.prepare()
return config
def get_vh_truth(temp_dir, config_name):
"""Return the ground truth for the specified directory."""
if config_name == "debian_nginx_2_4/two_vhost_80":
prefix = os.path.join(
temp_dir, config_name, "nginx2/sites-available")
aug_pre = "/files" + prefix
vh_truth = [
obj.VirtualHost(
os.path.join(prefix, "encryption-example.conf"),
os.path.join(aug_pre, "encryption-example.conf/VirtualHost"),
set([obj.Addr.fromstring("*:80")]),
False, True, set(["encryption-example.demo"])),
obj.VirtualHost(
os.path.join(prefix, "default-ssl.conf"),
os.path.join(aug_pre, "default-ssl.conf/IfModule/VirtualHost"),
set([obj.Addr.fromstring("_default_:443")]), True, False),
obj.VirtualHost(
os.path.join(prefix, "000-default.conf"),
os.path.join(aug_pre, "000-default.conf/VirtualHost"),
set([obj.Addr.fromstring("*:80")]), False, True,
set(["ip-172-30-0-17"])),
obj.VirtualHost(
os.path.join(prefix, "letsencrypt.conf"),
os.path.join(aug_pre, "letsencrypt.conf/VirtualHost"),
set([obj.Addr.fromstring("*:80")]), False, True,
set(["letsencrypt.demo"])),
]
return vh_truth
return None

View File

@@ -11,6 +11,7 @@ from setuptools import setup
if os.path.abspath(__file__).split(os.path.sep)[1] == 'vagrant':
del os.link
def read_file(filename, encoding='utf8'):
"""Read unicode from given file."""
with codecs.open(filename, encoding=encoding) as fd:
@@ -36,6 +37,7 @@ install_requires = [
'pyasn1', # urllib3 InsecurePlatformWarning (#304)
'pycrypto',
'PyOpenSSL',
'pyparsing>=1.5.5',
'pyrfc3339',
'python-augeas',
'python2-pythondialog>=3.2.2rc1', # Debian squeeze support, cf. #280
@@ -103,6 +105,8 @@ setup(
'letsencrypt.client.plugins',
'letsencrypt.client.plugins.apache',
'letsencrypt.client.plugins.apache.tests',
'letsencrypt.client.plugins.nginx',
'letsencrypt.client.plugins.nginx.tests',
'letsencrypt.client.plugins.standalone',
'letsencrypt.client.plugins.standalone.tests',
'letsencrypt.client.tests',