1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Various clean-ups in certbot-apache. Use f-strings. (#9132)

* Various clean-ups in certbot-apache. Use f-strings.

* Smaller tweaks
This commit is contained in:
Mads Jensen
2022-01-02 00:27:47 +01:00
committed by GitHub
parent 00f98fa911
commit eeca208c8f
7 changed files with 101 additions and 119 deletions

View File

@@ -172,8 +172,7 @@ def parse_includes(apachectl):
:rtype: list of str
"""
inc_cmd = [apachectl, "-t", "-D",
"DUMP_INCLUDES"]
inc_cmd = [apachectl, "-t", "-D", "DUMP_INCLUDES"]
return parse_from_subprocess(inc_cmd, r"\(.*\) (.*)")
@@ -188,8 +187,7 @@ def parse_modules(apachectl):
:rtype: list of str
"""
mod_cmd = [apachectl, "-t", "-D",
"DUMP_MODULES"]
mod_cmd = [apachectl, "-t", "-D", "DUMP_MODULES"]
return parse_from_subprocess(mod_cmd, r"(.*)_module")

View File

@@ -14,7 +14,8 @@ class ApacheParserNode(interfaces.ParserNode):
"""
def __init__(self, **kwargs):
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs) # pylint: disable=unused-variable
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(
kwargs) # pylint: disable=unused-variable
super().__init__(**kwargs)
self.ancestor = ancestor
self.filepath = filepath

View File

@@ -388,7 +388,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
"""
Deletes a ParserNode from the sequence of children, and raises an
exception if it's unable to do so.
:param AugeasParserNode: child: A node to delete.
:param AugeasParserNode child: A node to delete.
"""
if not self.parser.aug.remove(child.metadata["augeaspath"]):
@@ -531,7 +531,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
position
)
return (insert_path, resulting_path, before)
return insert_path, resulting_path, before
interfaces.CommentNode.register(AugeasCommentNode)

View File

@@ -154,7 +154,7 @@ class ApacheConfigurator(common.Configurator):
# So for old versions of Apache we pick a configuration without this option.
min_openssl_version = util.parse_loose_version('1.0.2l')
openssl_version = self.openssl_version(warn_on_no_mod_ssl)
if self.version < (2, 4, 11) or not openssl_version or\
if self.version < (2, 4, 11) or not openssl_version or \
util.parse_loose_version(openssl_version) < min_openssl_version:
return apache_util.find_ssl_apache_conf("old")
return apache_util.find_ssl_apache_conf("current")
@@ -470,15 +470,16 @@ class ApacheConfigurator(common.Configurator):
"""Initializes the ParserNode parser root instance."""
if HAS_APACHECONFIG:
apache_vars = {}
apache_vars["defines"] = apache_util.parse_defines(self.options.ctl)
apache_vars["includes"] = apache_util.parse_includes(self.options.ctl)
apache_vars["modules"] = apache_util.parse_modules(self.options.ctl)
apache_vars = {
"defines": apache_util.parse_defines(self.options.ctl),
"includes": apache_util.parse_includes(self.options.ctl),
"modules": apache_util.parse_modules(self.options.ctl),
}
metadata["apache_vars"] = apache_vars
with open(self.parser.loc["root"]) as f:
with apacheconfig.make_loader(writable=True,
**apacheconfig.flavors.NATIVE_APACHE) as loader:
**apacheconfig.flavors.NATIVE_APACHE) as loader:
metadata["ac_ast"] = loader.loads(f.read())
return dualparser.DualBlockNode(
@@ -627,7 +628,7 @@ class ApacheConfigurator(common.Configurator):
# If we haven't managed to enable mod_ssl by this point, error out
if "ssl_module" not in self.parser.modules:
raise errors.MisconfigurationError("Could not find ssl_module; "
"not installing certificate.")
"not installing certificate.")
# Add directives and remove duplicates
self._add_dummy_ssl_directives(vhost.path)
@@ -928,7 +929,7 @@ class ApacheConfigurator(common.Configurator):
# Get last ServerName as each overwrites the previous
servername = self.parser.get_arg(servername_match[-1])
return (servername, serveraliases)
return servername, serveraliases
def _add_servernames(self, host):
"""Helper function for get_virtual_hosts().
@@ -976,7 +977,7 @@ class ApacheConfigurator(common.Configurator):
is_ssl = True
filename = apache_util.get_file_path(
self.parser.aug.get("/augeas/files%s/path" % apache_util.get_file_path(path)))
self.parser.aug.get(f"/augeas/files{apache_util.get_file_path(path)}/path"))
if filename is None:
return None
@@ -1031,7 +1032,7 @@ class ApacheConfigurator(common.Configurator):
for vhost_path in list(self.parser.parser_paths):
paths = self.parser.aug.match(
("/files%s//*[label()=~regexp('%s')]" %
(vhost_path, parser.case_i("VirtualHost"))))
(vhost_path, parser.case_i("VirtualHost"))))
paths = [path for path in paths if
"virtualhost" in os.path.basename(path).lower()]
for path in paths:
@@ -1128,10 +1129,8 @@ class ApacheConfigurator(common.Configurator):
:type host: :class:`~certbot_apache.obj.VirtualHost`
"""
servername_match = vhost.node.find_directives("ServerName",
exclude=False)
serveralias_match = vhost.node.find_directives("ServerAlias",
exclude=False)
servername_match = vhost.node.find_directives("ServerName", exclude=False)
serveralias_match = vhost.node.find_directives("ServerAlias", exclude=False)
servername = None
if servername_match:
@@ -1143,7 +1142,6 @@ class ApacheConfigurator(common.Configurator):
vhost.aliases.add(serveralias)
vhost.name = servername
def is_name_vhost(self, target_addr):
"""Returns if vhost is a name based vhost
@@ -1210,7 +1208,7 @@ class ApacheConfigurator(common.Configurator):
# If HTTPS requested for nonstandard port, add service definition
if https and port != "443":
port_service = "%s %s" % (port, "https")
port_service = f"{port} https"
else:
port_service = port
@@ -1240,7 +1238,7 @@ class ApacheConfigurator(common.Configurator):
_, ip = listen[::-1].split(":", 1)
ip = ip[::-1]
if "%s:%s" % (ip, port_service) not in listen_dirs and (
"%s:%s" % (ip, port_service) not in listen_dirs):
"%s:%s" % (ip, port_service) not in listen_dirs):
listen_dirs.add("%s:%s" % (ip, port_service))
if https:
self._add_listens_https(listen_dirs, listens, port)
@@ -1262,15 +1260,15 @@ class ApacheConfigurator(common.Configurator):
# We have wildcard, skip the rest
self.parser.add_dir(parser.get_aug_path(self.parser.loc["listen"]),
"Listen", port)
self.save_notes += "Added Listen %s directive to %s\n" % (
port, self.parser.loc["listen"])
self.save_notes += (
f"Added Listen {port} directive to {self.parser.loc['listen']}\n"
)
else:
for listen in new_listens:
self.parser.add_dir(parser.get_aug_path(
self.parser.loc["listen"]), "Listen", listen.split(" "))
self.save_notes += ("Added Listen %s directive to "
"%s\n") % (listen,
self.parser.loc["listen"])
self.save_notes += (f"Added Listen {listen} directive to "
f"{self.parser.loc['listen']}\n")
def _add_listens_https(self, listens, listens_orig, port):
"""Helper method for ensure_listen to figure out which new
@@ -1283,7 +1281,7 @@ class ApacheConfigurator(common.Configurator):
# Add service definition for non-standard ports
if port != "443":
port_service = "%s %s" % (port, "https")
port_service = f"{port} https"
else:
port_service = port
@@ -1294,16 +1292,16 @@ class ApacheConfigurator(common.Configurator):
self.parser.add_dir_to_ifmodssl(
parser.get_aug_path(self.parser.loc["listen"]),
"Listen", port_service.split(" "))
self.save_notes += "Added Listen %s directive to %s\n" % (
port_service, self.parser.loc["listen"])
self.save_notes += (
f"Added Listen {port_service} directive to {self.parser.loc['listen']}\n"
)
else:
for listen in new_listens:
self.parser.add_dir_to_ifmodssl(
parser.get_aug_path(self.parser.loc["listen"]),
"Listen", listen.split(" "))
self.save_notes += ("Added Listen %s directive to "
"%s\n") % (listen,
self.parser.loc["listen"])
self.save_notes += (f"Added Listen {listen} directive to "
f"{self.parser.loc['listen']}\n")
def _has_port_already(self, listens, port):
"""Helper method for prepare_server_https to find out if user
@@ -1368,8 +1366,8 @@ class ApacheConfigurator(common.Configurator):
ssl_fp = self._get_ssl_vhost_path(avail_fp)
orig_matches = self.parser.aug.match("/files%s//* [label()=~regexp('%s')]" %
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
self._copy_create_ssl_vhost_skeleton(nonssl_vhost, ssl_fp)
@@ -1377,8 +1375,8 @@ class ApacheConfigurator(common.Configurator):
self.parser.aug.load()
# Get Vhost augeas path for new vhost
new_matches = self.parser.aug.match("/files%s//* [label()=~regexp('%s')]" %
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
vh_p = self._get_new_vh_path(orig_matches, new_matches)
@@ -1612,7 +1610,7 @@ class ApacheConfigurator(common.Configurator):
span_val = self.parser.aug.span(vhost.path)
except ValueError:
logger.critical("Error while reading the VirtualHost %s from "
"file %s", vhost.name, vhost.filep, exc_info=True)
"file %s", vhost.name, vhost.filep, exc_info=True)
raise errors.PluginError("Unable to read VirtualHost from file")
span_filep = span_val[0]
span_start = span_val[5]
@@ -1742,9 +1740,8 @@ class ApacheConfigurator(common.Configurator):
for test_vh in self.vhosts:
if (vhost.filep != test_vh.filep and
any(test_addr in addrs for
test_addr in test_vh.addrs) and
not self.is_name_vhost(addr)):
any(test_addr in addrs for
test_addr in test_vh.addrs) and not self.is_name_vhost(addr)):
self.add_name_vhost(addr)
logger.info("Enabling NameVirtualHosts on %s", addr)
need_to_save = True
@@ -1940,10 +1937,7 @@ class ApacheConfigurator(common.Configurator):
Searches AutoHSTS managed VirtualHosts that belong to the lineage.
Matches the private key path.
"""
return bool(
self.parser.find_dir("SSLCertificateKeyFile",
lineage.key_path, vhost.path))
return bool(self.parser.find_dir("SSLCertificateKeyFile", lineage.key_path, vhost.path))
def _enable_ocsp_stapling(self, ssl_vhost, unused_options):
"""Enables OCSP Stapling
@@ -1981,7 +1975,7 @@ class ApacheConfigurator(common.Configurator):
# Check if there's an existing SSLUseStapling directive on.
use_stapling_aug_path = self.parser.find_dir("SSLUseStapling",
"on", start=ssl_vhost.path)
"on", start=ssl_vhost.path)
if not use_stapling_aug_path:
self.parser.add_dir(ssl_vhost.path, "SSLUseStapling", "on")
@@ -1989,20 +1983,20 @@ class ApacheConfigurator(common.Configurator):
# Check if there's an existing SSLStaplingCache directive.
stapling_cache_aug_path = self.parser.find_dir('SSLStaplingCache',
None, ssl_vhost_aug_path)
None, ssl_vhost_aug_path)
# We'll simply delete the directive, so that we'll have a
# consistent OCSP cache path.
if stapling_cache_aug_path:
self.parser.aug.remove(
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path,
"SSLStaplingCache",
["shmcb:/var/run/apache2/stapling_cache(128000)"])
"SSLStaplingCache",
["shmcb:/var/run/apache2/stapling_cache(128000)"])
msg = "OCSP Stapling was enabled on SSL Vhost: %s.\n"%(
ssl_vhost.filep)
ssl_vhost.filep)
self.save_notes += msg
self.save()
logger.info(msg)
@@ -2073,7 +2067,7 @@ class ApacheConfigurator(common.Configurator):
for match in header_path:
if re.search(pat, self.parser.aug.get(match).lower()):
raise errors.PluginEnhancementAlreadyPresent(
"Existing %s header" % (header_substring))
"Existing %s header" % header_substring)
def _enable_redirect(self, ssl_vhost, unused_options):
"""Redirect all equivalent HTTP traffic to ssl_vhost.
@@ -2159,10 +2153,10 @@ class ApacheConfigurator(common.Configurator):
def _set_https_redirection_rewrite_rule(self, vhost):
if self.get_version() >= (2, 3, 9):
self.parser.add_dir(vhost.path, "RewriteRule",
constants.REWRITE_HTTPS_ARGS_WITH_END)
constants.REWRITE_HTTPS_ARGS_WITH_END)
else:
self.parser.add_dir(vhost.path, "RewriteRule",
constants.REWRITE_HTTPS_ARGS)
constants.REWRITE_HTTPS_ARGS)
def _verify_no_certbot_redirect(self, vhost):
"""Checks to see if a redirect was already installed by certbot.
@@ -2234,7 +2228,7 @@ class ApacheConfigurator(common.Configurator):
"""
rewrite_engine_path_list = self.parser.find_dir("RewriteEngine", "on",
start=vhost.path)
start=vhost.path)
if rewrite_engine_path_list:
for re_path in rewrite_engine_path_list:
# A RewriteEngine directive may also be included in per
@@ -2285,22 +2279,19 @@ class ApacheConfigurator(common.Configurator):
else:
rewrite_rule_args = constants.REWRITE_HTTPS_ARGS
return ("<VirtualHost %s>\n"
"%s \n"
"%s \n"
"ServerSignature Off\n"
"\n"
"RewriteEngine On\n"
"RewriteRule %s\n"
"\n"
"ErrorLog %s/redirect.error.log\n"
"LogLevel warn\n"
"</VirtualHost>\n"
% (" ".join(str(addr) for
addr in self._get_proposed_addrs(ssl_vhost)),
servername, serveralias,
" ".join(rewrite_rule_args),
self.options.logs_root))
return (
f"<VirtualHost {' '.join(str(addr) for addr in self._get_proposed_addrs(ssl_vhost))}>\n"
f"{servername} \n"
f"{serveralias} \n"
f"ServerSignature Off\n"
f"\n"
f"RewriteEngine On\n"
f"RewriteRule {' '.join(rewrite_rule_args)}\n"
"\n"
f"ErrorLog {self.options.logs_root}/redirect.error.log\n"
f"LogLevel warn\n"
f"</VirtualHost>\n"
)
def _write_out_redirect(self, ssl_vhost, text):
# This is the default name
@@ -2409,11 +2400,13 @@ class ApacheConfigurator(common.Configurator):
generic fashion.
"""
mod_message = ("Apache needs to have module \"{0}\" active for the " +
"requested installation options. Unfortunately Certbot is unable " +
"to install or enable it for you. Please install the module, and " +
"run Certbot again.")
raise errors.MisconfigurationError(mod_message.format(mod_name))
mod_message = (
f"Apache needs to have module \"{mod_name}\" active for the "
"requested installation options. Unfortunately Certbot is unable "
"to install or enable it for you. Please install the module, and "
"run Certbot again."
)
raise errors.MisconfigurationError(mod_message)
def restart(self):
"""Runs a config test and reloads the Apache server.
@@ -2645,7 +2638,7 @@ class ApacheConfigurator(common.Configurator):
self.parser.add_dir(ssl_vhost.path, "Header", hsts_header)
note_msg = ("Adding gradually increasing HSTS header with initial value "
"of {0} to VirtualHost in {1}\n".format(
initial_maxage, ssl_vhost.filep))
initial_maxage, ssl_vhost.filep))
self.save_notes += note_msg
# Save the current state to pluginstorage

View File

@@ -108,17 +108,17 @@ def _vhost_menu(domain, vhosts):
try:
code, tag = display_util.menu(
"We were unable to find a vhost with a ServerName "
"or Address of {0}.{1}Which virtual host would you "
"like to choose?".format(domain, os.linesep),
f"We were unable to find a vhost with a ServerName "
f"or Address of {domain}.{os.linesep}Which virtual host would you "
f"like to choose?",
choices, force_interactive=True)
except errors.MissingCommandlineFlag:
msg = (
"Encountered vhost ambiguity when trying to find a vhost for "
"{0} but was unable to ask for user "
"guidance in non-interactive mode. Certbot may need "
"vhosts to be explicitly labelled with ServerName or "
"ServerAlias directives.".format(domain))
f"Encountered vhost ambiguity when trying to find a vhost for "
f"{domain} but was unable to ask for user "
f"guidance in non-interactive mode. Certbot may need "
f"vhosts to be explicitly labelled with ServerName or "
f"ServerAlias directives.")
logger.error(msg)
raise errors.MissingCommandlineFlag(msg)

View File

@@ -102,7 +102,7 @@ For this reason the internal representation of data should not ignore the case.
import abc
class ParserNode(object, metaclass=abc.ABCMeta):
class ParserNode(metaclass=abc.ABCMeta):
"""
ParserNode is the basic building block of the tree of such nodes,
representing the structure of the configuration. It is largely meant to keep
@@ -239,9 +239,9 @@ class CommentNode(ParserNode, metaclass=abc.ABCMeta):
:type dirty: bool
"""
super().__init__(ancestor=kwargs['ancestor'],
dirty=kwargs.get('dirty', False),
filepath=kwargs['filepath'],
metadata=kwargs.get('metadata', {})) # pragma: no cover
dirty=kwargs.get('dirty', False),
filepath=kwargs['filepath'],
metadata=kwargs.get('metadata', {})) # pragma: no cover
class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
@@ -303,9 +303,9 @@ class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
"""
super().__init__(ancestor=kwargs['ancestor'],
dirty=kwargs.get('dirty', False),
filepath=kwargs['filepath'],
metadata=kwargs.get('metadata', {})) # pragma: no cover
dirty=kwargs.get('dirty', False),
filepath=kwargs['filepath'],
metadata=kwargs.get('metadata', {})) # pragma: no cover
@abc.abstractmethod
def set_parameters(self, parameters):

View File

@@ -21,7 +21,7 @@ class Addr(common.Addr):
return False
def __repr__(self):
return "certbot_apache._internal.obj.Addr(" + repr(self.tup) + ")"
return f"certbot_apache._internal.obj.Addr({repr(self.tup)})"
def __hash__(self): # pylint: disable=useless-super-delegation
# Python 3 requires explicit overridden for __hash__ if __eq__ or
@@ -147,34 +147,24 @@ class VirtualHost:
def __str__(self):
return (
"File: {filename}\n"
"Vhost path: {vhpath}\n"
"Addresses: {addrs}\n"
"Name: {name}\n"
"Aliases: {aliases}\n"
"TLS Enabled: {tls}\n"
"Site Enabled: {active}\n"
"mod_macro Vhost: {modmacro}".format(
filename=self.filep,
vhpath=self.path,
addrs=", ".join(str(addr) for addr in self.addrs),
name=self.name if self.name is not None else "",
aliases=", ".join(name for name in self.aliases),
tls="Yes" if self.ssl else "No",
active="Yes" if self.enabled else "No",
modmacro="Yes" if self.modmacro else "No"))
f"File: {self.filep}\n"
f"Vhost path: {self.path}\n"
f"Addresses: {', '.join(str(addr) for addr in self.addrs)}\n"
f"Name: {self.name if self.name is not None else ''}\n"
f"Aliases: {', '.join(name for name in self.aliases)}\n"
f"TLS Enabled: {'Yes' if self.ssl else 'No'}\n"
f"Site Enabled: {'Yes' if self.enabled else 'No'}\n"
f"mod_macro Vhost: {'Yes' if self.modmacro else 'No'}"
)
def display_repr(self):
"""Return a representation of VHost to be used in dialog"""
return (
"File: {filename}\n"
"Addresses: {addrs}\n"
"Names: {names}\n"
"HTTPS: {https}\n".format(
filename=self.filep,
addrs=", ".join(str(addr) for addr in self.addrs),
names=", ".join(self.get_names()),
https="Yes" if self.ssl else "No"))
f"File: {self.filep}\n"
f"Addresses: {', '.join(str(addr) for addr in self.addrs)}\n"
f"Names: {', '.join(self.get_names())}\n"
f"HTTPS: {'Yes' if self.ssl else 'No'}\n"
)
def __eq__(self, other):
if isinstance(other, self.__class__):