mirror of
https://github.com/certbot/certbot.git
synced 2026-01-21 19:01:07 +03:00
218 lines
6.1 KiB
Python
218 lines
6.1 KiB
Python
"""Utilities for all Let's Encrypt."""
|
|
import collections
|
|
import errno
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import stat
|
|
|
|
from letsencrypt import errors
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
Key = collections.namedtuple("Key", "file pem")
|
|
# Note: form is the type of data, "pem" or "der"
|
|
CSR = collections.namedtuple("CSR", "file data form")
|
|
|
|
|
|
# ANSI SGR escape codes
|
|
# Formats text as bold or with increased intensity
|
|
ANSI_SGR_BOLD = '\033[1m'
|
|
# Colors text red
|
|
ANSI_SGR_RED = "\033[31m"
|
|
# Resets output format
|
|
ANSI_SGR_RESET = "\033[0m"
|
|
|
|
|
|
def run_script(params):
|
|
"""Run the script with the given params.
|
|
|
|
:param list params: List of parameters to pass to Popen
|
|
|
|
"""
|
|
try:
|
|
proc = subprocess.Popen(params,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
|
|
except (OSError, ValueError):
|
|
msg = "Unable to run the command: %s" % " ".join(params)
|
|
logger.error(msg)
|
|
raise errors.SubprocessError(msg)
|
|
|
|
stdout, stderr = proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
msg = "Error while running %s.\n%s\n%s" % (
|
|
" ".join(params), stdout, stderr)
|
|
# Enter recovery routine...
|
|
logger.error(msg)
|
|
raise errors.SubprocessError(msg)
|
|
|
|
return stdout, stderr
|
|
|
|
|
|
def exe_exists(exe):
|
|
"""Determine whether path/name refers to an executable.
|
|
|
|
:param str exe: Executable path or name
|
|
|
|
:returns: If exe is a valid executable
|
|
:rtype: bool
|
|
|
|
"""
|
|
def is_exe(path):
|
|
"""Determine if path is an exe."""
|
|
return os.path.isfile(path) and os.access(path, os.X_OK)
|
|
|
|
path, _ = os.path.split(exe)
|
|
if path:
|
|
return is_exe(exe)
|
|
else:
|
|
for path in os.environ["PATH"].split(os.pathsep):
|
|
if is_exe(os.path.join(path, exe)):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def make_or_verify_dir(directory, mode=0o755, uid=0, strict=False):
|
|
"""Make sure directory exists with proper permissions.
|
|
|
|
:param str directory: Path to a directory.
|
|
:param int mode: Directory mode.
|
|
:param int uid: Directory owner.
|
|
|
|
:raises .errors.Error: if a directory already exists,
|
|
but has wrong permissions or owner
|
|
|
|
:raises OSError: if invalid or inaccessible file names and
|
|
paths, or other arguments that have the correct type,
|
|
but are not accepted by the operating system.
|
|
|
|
"""
|
|
try:
|
|
os.makedirs(directory, mode)
|
|
except OSError as exception:
|
|
if exception.errno == errno.EEXIST:
|
|
if strict and not check_permissions(directory, mode, uid):
|
|
raise errors.Error(
|
|
"%s exists, but it should be owned by user %d with"
|
|
"permissions %s" % (directory, uid, oct(mode)))
|
|
else:
|
|
raise
|
|
|
|
|
|
def check_permissions(filepath, mode, uid=0):
|
|
"""Check file or directory permissions.
|
|
|
|
:param str filepath: Path to the tested file (or directory).
|
|
:param int mode: Expected file mode.
|
|
:param int uid: Expected file owner.
|
|
|
|
:returns: True if `mode` and `uid` match, False otherwise.
|
|
:rtype: bool
|
|
|
|
"""
|
|
file_stat = os.stat(filepath)
|
|
return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid
|
|
|
|
|
|
def safe_open(path, mode="w", chmod=None, buffering=None):
|
|
"""Safely open a file.
|
|
|
|
:param str path: Path to a file.
|
|
:param str mode: Same os `mode` for `open`.
|
|
:param int chmod: Same as `mode` for `os.open`, uses Python defaults
|
|
if ``None``.
|
|
:param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
|
|
defaults if ``None``.
|
|
|
|
"""
|
|
# pylint: disable=star-args
|
|
open_args = () if chmod is None else (chmod,)
|
|
fdopen_args = () if buffering is None else (buffering,)
|
|
return os.fdopen(
|
|
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
|
|
mode, *fdopen_args)
|
|
|
|
|
|
def _unique_file(path, filename_pat, count, mode):
|
|
while True:
|
|
current_path = os.path.join(path, filename_pat(count))
|
|
try:
|
|
return safe_open(current_path, chmod=mode), current_path
|
|
except OSError as err:
|
|
# "File exists," is okay, try a different name.
|
|
if err.errno != errno.EEXIST:
|
|
raise
|
|
count += 1
|
|
|
|
|
|
def unique_file(path, mode=0o777):
|
|
"""Safely finds a unique file.
|
|
|
|
:param str path: path/filename.ext
|
|
:param int mode: File mode
|
|
|
|
:returns: tuple of file object and file name
|
|
|
|
"""
|
|
path, tail = os.path.split(path)
|
|
return _unique_file(
|
|
path, filename_pat=(lambda count: "%04d_%s" % (count, tail)),
|
|
count=0, mode=mode)
|
|
|
|
|
|
def unique_lineage_name(path, filename, mode=0o777):
|
|
"""Safely finds a unique file using lineage convention.
|
|
|
|
:param str path: directory path
|
|
:param str filename: proposed filename
|
|
:param int mode: file mode
|
|
|
|
:returns: tuple of file object and file name (which may be modified
|
|
from the requested one by appending digits to ensure uniqueness)
|
|
|
|
:raises OSError: if writing files fails for an unanticipated reason,
|
|
such as a full disk or a lack of permission to write to
|
|
specified location.
|
|
|
|
"""
|
|
preferred_path = os.path.join(path, "%s.conf" % (filename))
|
|
try:
|
|
return safe_open(preferred_path, chmod=mode), preferred_path
|
|
except OSError as err:
|
|
if err.errno != errno.EEXIST:
|
|
raise
|
|
return _unique_file(
|
|
path, filename_pat=(lambda count: "%s-%04d.conf" % (filename, count)),
|
|
count=1, mode=mode)
|
|
|
|
|
|
def safely_remove(path):
|
|
"""Remove a file that may not exist."""
|
|
try:
|
|
os.remove(path)
|
|
except OSError as err:
|
|
if err.errno != errno.ENOENT:
|
|
raise
|
|
|
|
|
|
# Just make sure we don't get pwned... Make sure that it also doesn't
|
|
# start with a period or have two consecutive periods <- this needs to
|
|
# be done in addition to the regex
|
|
EMAIL_REGEX = re.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+$")
|
|
|
|
|
|
def safe_email(email):
|
|
"""Scrub email address before using it."""
|
|
if EMAIL_REGEX.match(email) is not None:
|
|
return not email.startswith(".") and ".." not in email
|
|
else:
|
|
logger.warn("Invalid email address: %s.", email)
|
|
return False
|