1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-21 19:01:07 +03:00
Files
certbot/letsencrypt/le_util.py
2015-09-26 12:18:32 -07:00

218 lines
6.1 KiB
Python

"""Utilities for all Let's Encrypt."""
import collections
import errno
import logging
import os
import re
import subprocess
import stat
from letsencrypt import errors
logger = logging.getLogger(__name__)
Key = collections.namedtuple("Key", "file pem")
# Note: form is the type of data, "pem" or "der"
CSR = collections.namedtuple("CSR", "file data form")
# ANSI SGR escape codes
# Formats text as bold or with increased intensity
ANSI_SGR_BOLD = '\033[1m'
# Colors text red
ANSI_SGR_RED = "\033[31m"
# Resets output format
ANSI_SGR_RESET = "\033[0m"
def run_script(params):
"""Run the script with the given params.
:param list params: List of parameters to pass to Popen
"""
try:
proc = subprocess.Popen(params,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except (OSError, ValueError):
msg = "Unable to run the command: %s" % " ".join(params)
logger.error(msg)
raise errors.SubprocessError(msg)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
msg = "Error while running %s.\n%s\n%s" % (
" ".join(params), stdout, stderr)
# Enter recovery routine...
logger.error(msg)
raise errors.SubprocessError(msg)
return stdout, stderr
def exe_exists(exe):
"""Determine whether path/name refers to an executable.
:param str exe: Executable path or name
:returns: If exe is a valid executable
:rtype: bool
"""
def is_exe(path):
"""Determine if path is an exe."""
return os.path.isfile(path) and os.access(path, os.X_OK)
path, _ = os.path.split(exe)
if path:
return is_exe(exe)
else:
for path in os.environ["PATH"].split(os.pathsep):
if is_exe(os.path.join(path, exe)):
return True
return False
def make_or_verify_dir(directory, mode=0o755, uid=0, strict=False):
"""Make sure directory exists with proper permissions.
:param str directory: Path to a directory.
:param int mode: Directory mode.
:param int uid: Directory owner.
:raises .errors.Error: if a directory already exists,
but has wrong permissions or owner
:raises OSError: if invalid or inaccessible file names and
paths, or other arguments that have the correct type,
but are not accepted by the operating system.
"""
try:
os.makedirs(directory, mode)
except OSError as exception:
if exception.errno == errno.EEXIST:
if strict and not check_permissions(directory, mode, uid):
raise errors.Error(
"%s exists, but it should be owned by user %d with"
"permissions %s" % (directory, uid, oct(mode)))
else:
raise
def check_permissions(filepath, mode, uid=0):
"""Check file or directory permissions.
:param str filepath: Path to the tested file (or directory).
:param int mode: Expected file mode.
:param int uid: Expected file owner.
:returns: True if `mode` and `uid` match, False otherwise.
:rtype: bool
"""
file_stat = os.stat(filepath)
return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid
def safe_open(path, mode="w", chmod=None, buffering=None):
"""Safely open a file.
:param str path: Path to a file.
:param str mode: Same os `mode` for `open`.
:param int chmod: Same as `mode` for `os.open`, uses Python defaults
if ``None``.
:param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
defaults if ``None``.
"""
# pylint: disable=star-args
open_args = () if chmod is None else (chmod,)
fdopen_args = () if buffering is None else (buffering,)
return os.fdopen(
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
mode, *fdopen_args)
def _unique_file(path, filename_pat, count, mode):
while True:
current_path = os.path.join(path, filename_pat(count))
try:
return safe_open(current_path, chmod=mode), current_path
except OSError as err:
# "File exists," is okay, try a different name.
if err.errno != errno.EEXIST:
raise
count += 1
def unique_file(path, mode=0o777):
"""Safely finds a unique file.
:param str path: path/filename.ext
:param int mode: File mode
:returns: tuple of file object and file name
"""
path, tail = os.path.split(path)
return _unique_file(
path, filename_pat=(lambda count: "%04d_%s" % (count, tail)),
count=0, mode=mode)
def unique_lineage_name(path, filename, mode=0o777):
"""Safely finds a unique file using lineage convention.
:param str path: directory path
:param str filename: proposed filename
:param int mode: file mode
:returns: tuple of file object and file name (which may be modified
from the requested one by appending digits to ensure uniqueness)
:raises OSError: if writing files fails for an unanticipated reason,
such as a full disk or a lack of permission to write to
specified location.
"""
preferred_path = os.path.join(path, "%s.conf" % (filename))
try:
return safe_open(preferred_path, chmod=mode), preferred_path
except OSError as err:
if err.errno != errno.EEXIST:
raise
return _unique_file(
path, filename_pat=(lambda count: "%s-%04d.conf" % (filename, count)),
count=1, mode=mode)
def safely_remove(path):
"""Remove a file that may not exist."""
try:
os.remove(path)
except OSError as err:
if err.errno != errno.ENOENT:
raise
# Just make sure we don't get pwned... Make sure that it also doesn't
# start with a period or have two consecutive periods <- this needs to
# be done in addition to the regex
EMAIL_REGEX = re.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+$")
def safe_email(email):
"""Scrub email address before using it."""
if EMAIL_REGEX.match(email) is not None:
return not email.startswith(".") and ".." not in email
else:
logger.warn("Invalid email address: %s.", email)
return False