1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-21 19:01:07 +03:00

Fix most pylint errors

This commit is contained in:
yan
2015-04-17 17:05:00 -07:00
parent 1505f5e7bc
commit 995b5622f8
11 changed files with 191 additions and 152 deletions

1
.gitignore vendored
View File

@@ -1,5 +1,6 @@
*.pyc
*.egg-info
.eggs/
build/
dist/
venv/

View File

@@ -89,6 +89,7 @@ class NginxConfigurator(object):
# Entry point in main.py for installing cert
def deploy_cert(self, domain, cert, key, cert_chain=None):
# pylint: disable=unused-argument
"""Deploys certificate to specified virtual host. Aborts if the
vhost is missing ssl_certificate or ssl_certificate_key.
@@ -383,9 +384,9 @@ class NginxConfigurator(object):
nginx_version = tuple([int(i) for i in version_matches[0].split(".")])
# nginx < 0.8.21 doesn't use default_server
if (nginx_version[0] == 0 and
(nginx_version[1] < 8 or
(nginx_version[1] == 8 and nginx_version[2] < 21))):
if (nginx_version[0] == 0 and (nginx_version[1] < 8 or
(nginx_version[1] == 8 and
nginx_version[2] < 21))):
raise errors.LetsEncryptConfiguratorError(
"Nginx version must be 0.8.21+")

View File

@@ -30,15 +30,9 @@ class NginxDvsni(object):
VHOST_TEMPLATE = """\
<VirtualHost {vhost}>
ServerName {server_name}
UseCanonicalName on
SSLStrictSNIVHostCheck on
LimitRequestBody 1048576
Include {ssl_options_conf_path}
SSLCertificateFile {cert_path}
SSLCertificateKeyFile {key_path}
DocumentRoot {document_root}
</VirtualHost>

View File

@@ -7,6 +7,7 @@ from pyparsing import (
class RawNginxParser(object):
# pylint: disable=expression-not-assigned
"""
A class that parses nginx configuration with pyparsing
"""
@@ -51,6 +52,7 @@ class RawNginxParser(object):
class RawNginxDumper(object):
# pylint: disable=too-few-public-methods
"""
A class that dumps nginx configuration from the provided tree.
"""
@@ -86,6 +88,9 @@ class RawNginxDumper(object):
yield spacer * current_indent + key + spacer + values + ';'
def as_string(self):
"""
Return the parsed block as a string.
"""
return '\n'.join(self)
@@ -93,16 +98,45 @@ class RawNginxDumper(object):
# (like pyyaml, picker or json)
def loads(source):
"""Parses from a string.
:param str souce: The string to parse
:returns: The parsed tree
:rtype: list
"""
return RawNginxParser(source).as_list()
def load(_file):
"""Parses from a file.
:param file _file: The file to parse
:returns: The parsed tree
:rtype: list
"""
return loads(_file.read())
def dumps(blocks, indentation=4):
"""Dump to a string.
:param list block: The parsed tree
:param int indentation: The number of spaces to indent
:rtype: str
"""
return RawNginxDumper(blocks, indentation).as_string()
def dump(blocks, _file, indentation=4):
"""Dump to a file.
:param list block: The parsed tree
:param file _file: The file to dump to
:param int indentation: The number of spaces to indent
:rtype: NoneType
"""
return _file.write(dumps(blocks, indentation))

View File

@@ -45,7 +45,7 @@ class Addr(object):
return None
tup = addr.partition(':')
if re.match('^\d+$', tup[0]):
if re.match(r'^\d+$', tup[0]):
# This is a bare port, not a hostname. E.g. listen 80
host = ''
port = tup[0]

View File

@@ -50,19 +50,19 @@ class NginxParser(object):
trees = self._parse_files(filepath)
for tree in trees:
for entry in tree:
if self._is_include_directive(entry):
if _is_include_directive(entry):
# Parse the top-level included file
self._parse_recursively(entry[1])
elif entry[0] == ['http'] or entry[0] == ['server']:
# Look for includes in the top-level 'http'/'server' context
for subentry in entry[1]:
if self._is_include_directive(subentry):
if _is_include_directive(subentry):
self._parse_recursively(subentry[1])
elif entry[0] == ['http'] and subentry[0] == ['server']:
# Look for includes in a 'server' context within
# an 'http' context
for server_entry in subentry[1]:
if self._is_include_directive(server_entry):
if _is_include_directive(server_entry):
self._parse_recursively(server_entry[1])
def abs_path(self, path):
@@ -79,19 +79,8 @@ class NginxParser(object):
else:
return path
def _is_include_directive(self, entry):
"""Checks if an nginx parsed entry is an 'include' directive.
:param list entry: the parsed entry
:returns: Whether it's an 'include' directive
:rtype: bool
"""
return (type(entry) == list and
entry[0] == 'include' and len(entry) == 2 and
type(entry[1]) == str)
def get_vhosts(self):
# pylint: disable=cell-var-from-loop
"""Gets list of all 'virtual hosts' found in Nginx configuration.
Technically this is a misnomer because Nginx does not have virtual
hosts, it has 'server blocks'.
@@ -109,10 +98,11 @@ class NginxParser(object):
for filename in self.parsed:
tree = self.parsed[filename]
servers[filename] = []
srv = servers[filename] # workaround undefined loop var in lambdas
# Find all the server blocks
_do_for_subarray(tree, lambda x: x[0] == ['server'],
lambda x: servers[filename].append(x[1]))
lambda x: srv.append(x[1]))
# Find 'include' statements in server blocks and append their trees
for i, server in enumerate(servers[filename]):
@@ -122,7 +112,7 @@ class NginxParser(object):
for filename in servers:
for server in servers[filename]:
# Parse the server block into a VirtualHost object
parsed_server = self._parse_server(server)
parsed_server = _parse_server(server)
vhost = obj.VirtualHost(filename,
parsed_server['addrs'],
parsed_server['ssl'],
@@ -143,51 +133,16 @@ class NginxParser(object):
"""
result = list(block) # Copy the list to keep self.parsed idempotent
for directive in block:
if (self._is_include_directive(directive)):
if _is_include_directive(directive):
included_files = glob.glob(
self.abs_path(directive[1]))
for f in included_files:
for incl in included_files:
try:
result.extend(self.parsed[f])
except:
result.extend(self.parsed[incl])
except KeyError:
pass
return result
def _parse_server(self, server):
"""Parses a list of server directives.
:param list server: list of directives in a server block
:rtype: dict
"""
parsed_server = {}
parsed_server['addrs'] = set()
parsed_server['ssl'] = False
parsed_server['names'] = set()
for directive in server:
if directive[0] == 'listen':
addr = obj.Addr.fromstring(directive[1])
parsed_server['addrs'].add(addr)
if not parsed_server['ssl'] and addr.ssl:
parsed_server['ssl'] = True
elif directive[0] == 'server_name':
parsed_server['names'].update(
self._get_servernames(directive[1]))
return parsed_server
def _get_servernames(self, names):
"""Turns a server_name string into a list of server names
:param str names: server names
:rtype: list
"""
whitespace_re = re.compile(r'\s+')
names = re.sub(whitespace_re, ' ', names)
return names.split(' ')
def _parse_files(self, filepath, override=False):
"""Parse files from a glob
@@ -199,18 +154,18 @@ class NginxParser(object):
"""
files = glob.glob(filepath)
trees = []
for f in files:
if f in self.parsed and not override:
for item in files:
if item in self.parsed and not override:
continue
try:
with open(f) as fo:
parsed = load(fo)
self.parsed[f] = parsed
with open(item) as _file:
parsed = load(_file)
self.parsed[item] = parsed
trees.append(parsed)
except IOError:
logging.warn("Could not open file: %s" % f)
logging.warn("Could not open file: %s", item)
except pyparsing.ParseException:
logging.warn("Could not parse file: %s" % f)
logging.warn("Could not parse file: %s", item)
return trees
def _set_locations(self, ssl_options):
@@ -257,10 +212,10 @@ class NginxParser(object):
if ext:
filename = filename + os.path.extsep + ext
try:
with open(filename, 'w') as f:
dump(tree, f)
with open(filename, 'w') as _file:
dump(tree, _file)
except IOError:
logging.error("Could not open file for writing: %s" % filename)
logging.error("Could not open file for writing: %s", filename)
def _has_server_names(self, entry, names):
"""Checks if a server block has the given set of server_names. This
@@ -279,44 +234,22 @@ class NginxParser(object):
# Nothing to identify blocks with
return False
if type(entry) != list:
if not isinstance(entry, list):
# Can't be a server block
return False
new_entry = self._get_included_directives(entry)
server_names = set()
for item in new_entry:
if type(item) != list:
if not isinstance(item, list):
# Can't be a server block
return False
if item[0] == 'server_name':
server_names.update(self._get_servernames(item[1]))
server_names.update(_get_servernames(item[1]))
return server_names == names
def _replace_directives(self, block, directives):
"""Replaces directives in a block. If the directive doesn't exist in
the entry already, raises a misconfiguration error.
..todo :: Find directives that are in included files.
:param list block: The block to replace in
:param list directives: The new directives.
"""
for directive in directives:
changed = False
if len(directive) == 0:
continue
for index, line in enumerate(block):
if len(line) > 0 and line[0] == directive[0]:
block[index] = directive
changed = True
if not changed:
raise errors.LetsEncryptMisconfigurationError(
'LetsEncrypt expected directive for %s in the Nginx config '
'but did not find it.' % directive[0])
def add_server_directives(self, filename, names, directives,
replace=False):
"""Add or replace directives in server blocks whose server_name set
@@ -335,7 +268,7 @@ class NginxParser(object):
if replace:
_do_for_subarray(self.parsed[filename],
lambda x: self._has_server_names(x, names),
lambda x: self._replace_directives(x, directives))
lambda x: _replace_directives(x, directives))
else:
_do_for_subarray(self.parsed[filename],
lambda x: self._has_server_names(x, names),
@@ -375,7 +308,7 @@ def _do_for_subarray(entry, condition, func):
:param function func: The function to call for each matching item
"""
if type(entry) == list:
if isinstance(entry, list):
if condition(entry):
func(entry)
else:
@@ -411,15 +344,15 @@ def get_best_match(target_name, names):
if len(exact) > 0:
# There can be more than one exact match; e.g. eff.org, .eff.org
match = min(exact, key=lambda x: len(x))
match = min(exact, key=len)
return ('exact', match)
if len(wildcard_start) > 0:
# Return the longest wildcard
match = max(wildcard_start, key=lambda x: len(x))
match = max(wildcard_start, key=len)
return ('wildcard_start', match)
if len(wildcard_end) > 0:
# Return the longest wildcard
match = max(wildcard_end, key=lambda x: len(x))
match = max(wildcard_end, key=len)
return ('wildcard_end', match)
if len(regex) > 0:
# Just return the first one for now
@@ -430,7 +363,7 @@ def get_best_match(target_name, names):
def _exact_match(target_name, name):
return (target_name == name or '.' + target_name == name)
return target_name == name or '.' + target_name == name
def _wildcard_match(target_name, name, start):
@@ -473,6 +406,79 @@ def _regex_match(target_name, name):
return True
else:
return False
except:
except re.error:
# perl-compatible regexes are sometimes not recognized by python
return False
def _is_include_directive(entry):
"""Checks if an nginx parsed entry is an 'include' directive.
:param list entry: the parsed entry
:returns: Whether it's an 'include' directive
:rtype: bool
"""
return (isinstance(entry, list) and
entry[0] == 'include' and len(entry) == 2 and
isinstance(entry[1], str))
def _get_servernames(names):
"""Turns a server_name string into a list of server names
:param str names: server names
:rtype: list
"""
whitespace_re = re.compile(r'\s+')
names = re.sub(whitespace_re, ' ', names)
return names.split(' ')
def _parse_server(server):
"""Parses a list of server directives.
:param list server: list of directives in a server block
:rtype: dict
"""
parsed_server = {}
parsed_server['addrs'] = set()
parsed_server['ssl'] = False
parsed_server['names'] = set()
for directive in server:
if directive[0] == 'listen':
addr = obj.Addr.fromstring(directive[1])
parsed_server['addrs'].add(addr)
if not parsed_server['ssl'] and addr.ssl:
parsed_server['ssl'] = True
elif directive[0] == 'server_name':
parsed_server['names'].update(
_get_servernames(directive[1]))
return parsed_server
def _replace_directives(block, directives):
"""Replaces directives in a block. If the directive doesn't exist in
the entry already, raises a misconfiguration error.
..todo :: Find directives that are in included files.
:param list block: The block to replace in
:param list directives: The new directives.
"""
for directive in directives:
changed = False
if len(directive) == 0:
continue
for index, line in enumerate(block):
if len(line) > 0 and line[0] == directive[0]:
block[index] = directive
changed = True
if not changed:
raise errors.LetsEncryptMisconfigurationError(
'LetsEncrypt expected directive for %s in the Nginx config '
'but did not find it.' % directive[0])

View File

@@ -36,7 +36,7 @@ class NginxConfiguratorTest(util.NginxTest):
names = self.config.get_all_names()
self.assertEqual(names, set(
["*.www.foo.com", "somename", "another.alias",
"alias", "localhost", ".example.com", "~^(www\.)?(example|bar)\.",
"alias", "localhost", ".example.com", r"~^(www\.)?(example|bar)\.",
"155.225.50.69.nephoscale.net", "*.www.example.com",
"example.*", "www.example.org", "myhost"]))
@@ -70,7 +70,7 @@ class NginxConfiguratorTest(util.NginxTest):
parsed[0])
def test_choose_vhost(self):
localhost_conf = set(['localhost', '~^(www\.)?(example|bar)\.'])
localhost_conf = set(['localhost', r'~^(www\.)?(example|bar)\.'])
server_conf = set(['somename', 'another.alias', 'alias'])
example_conf = set(['.example.com', 'example.*'])
foo_conf = set(['*.www.foo.com', '*.www.example.com'])
@@ -225,17 +225,17 @@ class NginxConfiguratorTest(util.NginxTest):
@mock.patch("letsencrypt.client.plugins.nginx.configurator."
"subprocess.Popen")
def test_nginx_restart(self, mock_popen):
m = mock_popen()
m.communicate.return_value = ('', '')
m.returncode = 0
mocked = mock_popen()
mocked.communicate.return_value = ('', '')
mocked.returncode = 0
self.assertTrue(self.config.restart())
@mock.patch("letsencrypt.client.plugins.nginx.configurator."
"subprocess.Popen")
def test_config_test(self, mock_popen):
m = mock_popen()
m.communicate.return_value = ('', '')
m.returncode = 0
mocked = mock_popen()
mocked.communicate.return_value = ('', '')
mocked.returncode = 0
self.assertTrue(self.config.config_test())
if __name__ == "__main__":

View File

@@ -1,3 +1,4 @@
"""Test for letsencrypt.client.plugins.nginx.nginxparser."""
import operator
import unittest
@@ -6,10 +7,11 @@ from letsencrypt.client.plugins.nginx.nginxparser import (RawNginxParser,
from letsencrypt.client.plugins.nginx.tests import util
first = operator.itemgetter(0)
FIRST = operator.itemgetter(0)
class TestRawNginxParser(unittest.TestCase):
"""Test the raw low-level Nginx config parser."""
def test_assignments(self):
parsed = RawNginxParser.assignment.parseString('root /test;').asList()
@@ -28,8 +30,9 @@ class TestRawNginxParser(unittest.TestCase):
def test_nested_blocks(self):
parsed = RawNginxParser.block.parseString('foo { bar {} }').asList()
block, content = first(parsed)
self.assertEqual(first(content), [['bar'], []])
block, content = FIRST(parsed)
self.assertEqual(FIRST(content), [['bar'], []])
self.assertEqual(FIRST(block), 'foo')
def test_dump_as_string(self):
dumped = dumps([
@@ -71,13 +74,13 @@ class TestRawNginxParser(unittest.TestCase):
[['location', '/status'], [
[['types'], [['image/jpeg', 'jpg']]],
]],
[['location', '~', 'case_sensitive\.php$'], [
[['location', '~', r'case_sensitive\.php$'], [
['index', 'index.php'],
['root', '/var/root'],
]],
[['location', '~*', 'case_insensitive\.php$'], []],
[['location', '=', 'exact_match\.php$'], []],
[['location', '^~', 'ignore_regex\.php$'], []]
[['location', '~*', r'case_insensitive\.php$'], []],
[['location', '=', r'exact_match\.php$'], []],
[['location', '^~', r'ignore_regex\.php$'], []]
]]]]]
)
@@ -94,9 +97,9 @@ class TestRawNginxParser(unittest.TestCase):
[['location', '/'],
[['root', 'html'],
['index', 'index.html index.htm']]]]])
f = open(util.get_data_filename('nginx.new.conf'), 'w')
dump(parsed, f)
f.close()
_file = open(util.get_data_filename('nginx.new.conf'), 'w')
dump(parsed, _file)
_file.close()
parsed_new = load(open(util.get_data_filename('nginx.new.conf')))
self.assertEquals(parsed, parsed_new)

View File

@@ -95,10 +95,10 @@ class VirtualHostTest(unittest.TestCase):
self.assertFalse(vhost1b == 1234)
def test_str(self):
s = '\n'.join(['file: filep', 'addrs: localhost',
"names: set(['localhost'])", 'ssl: False',
'enabled: False'])
self.assertEqual(s, str(self.vhost1))
stringified = '\n'.join(['file: filep', 'addrs: localhost',
"names: set(['localhost'])", 'ssl: False',
'enabled: False'])
self.assertEqual(stringified, str(self.vhost1))
if __name__ == "__main__":

View File

@@ -43,10 +43,10 @@ class NginxParserTest(util.NginxTest):
"""
parser = NginxParser(self.config_path, self.ssl_options)
parser.load()
self.assertEqual(set(map(parser.abs_path,
['foo.conf', 'nginx.conf', 'server.conf',
'sites-enabled/default',
'sites-enabled/example.com'])),
self.assertEqual(set([parser.abs_path(x) for x in
['foo.conf', 'nginx.conf', 'server.conf',
'sites-enabled/default',
'sites-enabled/example.com']]),
set(parser.parsed.keys()))
self.assertEqual([['server_name', 'somename alias another.alias']],
parser.parsed[parser.abs_path('server.conf')])
@@ -85,7 +85,7 @@ class NginxParserTest(util.NginxTest):
vhost1 = VirtualHost(parser.abs_path('nginx.conf'),
[Addr('', '8080', False, False)],
False, True, set(['localhost',
'~^(www\.)?(example|bar)\.']),
r'~^(www\.)?(example|bar)\.']),
[])
vhost2 = VirtualHost(parser.abs_path('nginx.conf'),
[Addr('somename', '8080', False, False),
@@ -106,26 +106,26 @@ class NginxParserTest(util.NginxTest):
'*.www.example.com']), [])
self.assertEqual(5, len(vhosts))
example_com = filter(lambda x: 'example.com' in x.filep, vhosts)[0]
example_com = [x for x in vhosts if 'example.com' in x.filep][0]
self.assertEqual(vhost3, example_com)
default = filter(lambda x: 'default' in x.filep, vhosts)[0]
default = [x for x in vhosts if 'default' in x.filep][0]
self.assertEqual(vhost4, default)
foo = filter(lambda x: 'foo.conf' in x.filep, vhosts)[0]
self.assertEqual(vhost5, foo)
localhost = filter(lambda x: 'localhost' in x.names, vhosts)[0]
fooconf = [x for x in vhosts if 'foo.conf' in x.filep][0]
self.assertEqual(vhost5, fooconf)
localhost = [x for x in vhosts if 'localhost' in x.names][0]
self.assertEquals(vhost1, localhost)
somename = filter(lambda x: 'somename' in x.names, vhosts)[0]
somename = [x for x in vhosts if 'somename' in x.names][0]
self.assertEquals(vhost2, somename)
def test_add_server_directives(self):
parser = NginxParser(self.config_path, self.ssl_options)
parser.add_server_directives(parser.abs_path('nginx.conf'),
set(['localhost',
'~^(www\.)?(example|bar)\.']),
r'~^(www\.)?(example|bar)\.']),
[['foo', 'bar'], ['ssl_certificate',
'/etc/ssl/cert.pem']])
r = re.compile('foo bar;\n\s+ssl_certificate /etc/ssl/cert.pem')
self.assertEqual(1, len(re.findall(r, dumps(parser.parsed[
ssl_re = re.compile(r'foo bar;\n\s+ssl_certificate /etc/ssl/cert.pem')
self.assertEqual(1, len(re.findall(ssl_re, dumps(parser.parsed[
parser.abs_path('nginx.conf')]))))
parser.add_server_directives(parser.abs_path('server.conf'),
set(['alias', 'another.alias',
@@ -161,10 +161,10 @@ class NginxParserTest(util.NginxTest):
set(['*.eff.org', '.www.eff.org']),
set(['.eff.org', '*.org']),
set(['www.eff.', 'www.eff.*', '*.www.eff.org']),
set(['example.com', '~^(www\.)?(eff.+)', '*.eff.*']),
set(['*', '~^(www\.)?(eff.+)']),
set(['www.*', '~^(www\.)?(eff.+)', '.test.eff.org']),
set(['*.org', '*.eff.org', 'www.eff.*']),
set(['example.com', r'~^(www\.)?(eff.+)', '*.eff.*']),
set(['*', r'~^(www\.)?(eff.+)']),
set(['www.*', r'~^(www\.)?(eff.+)', '.test.eff.org']),
set(['*.org', r'*.eff.org', 'www.eff.*']),
set(['*.www.eff.org', 'www.*']),
set(['*.org']),
set([]),
@@ -174,7 +174,7 @@ class NginxParserTest(util.NginxTest):
('exact', '.www.eff.org'),
('wildcard_start', '.eff.org'),
('wildcard_end', 'www.eff.*'),
('regex', '~^(www\.)?(eff.+)'),
('regex', r'~^(www\.)?(eff.+)'),
('wildcard_start', '*'),
('wildcard_end', 'www.*'),
('wildcard_start', '*.eff.org'),
@@ -194,8 +194,8 @@ class NginxParserTest(util.NginxTest):
[['ssl_certificate', 'foo.pem'],
['ssl_certificate_key', 'bar.key'],
['listen', '443 ssl']])
ck = parser.get_all_certs_keys()
self.assertEqual(set([('foo.pem', 'bar.key', filep)]), ck)
c_k = parser.get_all_certs_keys()
self.assertEqual(set([('foo.pem', 'bar.key', filep)]), c_k)
if __name__ == "__main__":

View File

@@ -15,7 +15,6 @@ class NginxTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def setUp(self):
super(NginxTest, self).setUp()
self.maxDiff = None
self.temp_dir, self.config_dir, self.work_dir = dir_setup(
"testdata")
@@ -32,6 +31,7 @@ class NginxTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def get_data_filename(filename):
"""Gets the filename of a test data file."""
return pkg_resources.resource_filename(
"letsencrypt.client.plugins.nginx.tests", "testdata/%s" % filename)