mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-06-09 06:41:19 +03:00
275 lines
9.6 KiB
Python
275 lines
9.6 KiB
Python
"""Module contains all things related to working with .secrets file."""
|
|
import binascii
|
|
import json
|
|
import logging
|
|
import os
|
|
import pwd
|
|
import stat
|
|
from shutil import copyfile
|
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
from cryptography.hazmat.primitives import padding
|
|
|
|
from cmapi_server.constants import MCS_DATA_PATH, MCS_SECRETS_FILENAME
|
|
from cmapi_server.exceptions import CEJError
|
|
|
|
|
|
AES_BLOCK_SIZE_BITS = algorithms.AES.block_size
|
|
AES_IV_BIN_SIZE = int(AES_BLOCK_SIZE_BITS/8)
|
|
# two hex chars for each byte
|
|
AES_IV_HEX_SIZE = AES_IV_BIN_SIZE * 2
|
|
|
|
|
|
class CEJPasswordHandler():
|
|
"""Handler for CrossEngineSupport password decryption."""
|
|
|
|
@classmethod
|
|
def secretsfile_exists(cls, directory: str = MCS_DATA_PATH) -> bool:
|
|
"""Check the .secrets file in directory. Default MCS_SECRETS_FILE_PATH.
|
|
|
|
:param directory: path to the directory with .secrets file
|
|
:type directory: str, optional
|
|
:return: True if file exists and not empty.
|
|
:rtype: bool
|
|
"""
|
|
secrets_file_path = os.path.join(directory, MCS_SECRETS_FILENAME)
|
|
try:
|
|
if (
|
|
os.path.isfile(secrets_file_path) and
|
|
os.path.getsize(secrets_file_path) > 0
|
|
):
|
|
return True
|
|
except Exception:
|
|
# TODO: remove after check if python 3.8 everytime exist
|
|
# in package because isfile and getsize not rasing
|
|
# exceptions after 3.8
|
|
logging.warning(
|
|
'Something went wrong while detecting the .secrets file.',
|
|
exc_info=True
|
|
)
|
|
return False
|
|
|
|
@classmethod
|
|
def get_secrets_json(cls, directory: str = MCS_DATA_PATH) -> dict:
|
|
"""Get json from .secrets file.
|
|
|
|
:param directory: path to the directory with .secrets file
|
|
:type directory: str, optional
|
|
:return: json from .secrets file
|
|
:rtype: dict
|
|
:raises CEJError: on empty\corrupted\wrong format .secrets file
|
|
"""
|
|
secrets_file_path = os.path.join(directory, MCS_SECRETS_FILENAME)
|
|
if not cls.secretsfile_exists(directory=directory):
|
|
raise CEJError(f'{secrets_file_path} file does not exist.')
|
|
with open(secrets_file_path) as secrets_file:
|
|
try:
|
|
secrets_json = json.load(secrets_file)
|
|
except Exception:
|
|
logging.error(
|
|
'Something went wrong while loading json from '
|
|
f'{secrets_file_path}',
|
|
exc_info=True
|
|
)
|
|
raise CEJError(
|
|
f'Looks like file {secrets_file_path} is corrupted or'
|
|
'has wrong format.'
|
|
) from None
|
|
return secrets_json
|
|
|
|
@classmethod
|
|
def is_password_encrypted(cls, passwd: str) -> bool:
|
|
"""Check if password is encrypted.
|
|
|
|
:param passwd: CEJ password
|
|
:type passwd: str
|
|
:return: True if password is encrypted
|
|
:rtype: bool
|
|
"""
|
|
# minimal length of encrypted password
|
|
if len(passwd) < (AES_IV_HEX_SIZE*2):
|
|
return False
|
|
try:
|
|
# Try converting the string to an integer with base 16
|
|
int(passwd, 16)
|
|
except ValueError:
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def decrypt_password(
|
|
cls, enc_data: str, directory: str = MCS_DATA_PATH
|
|
) -> str:
|
|
"""Decrypt CEJ password if needed.
|
|
|
|
:param directory: path to the directory with .secrets file
|
|
:type directory: str, optional
|
|
:param enc_data: encrypted initialization vector + password in hex str
|
|
:type enc_data: str
|
|
:return: decrypted CEJ password
|
|
:rtype: str
|
|
"""
|
|
secrets_file_path = os.path.join(directory, MCS_SECRETS_FILENAME)
|
|
if (
|
|
not cls.secretsfile_exists(directory=directory) or
|
|
not cls.is_password_encrypted(enc_data)
|
|
):
|
|
logging.warning('Unencrypted CrossEngineSupport password used.')
|
|
return enc_data
|
|
|
|
logging.info('Encrypted CrossEngineSupport password found.')
|
|
|
|
try:
|
|
iv = bytes.fromhex(enc_data[:AES_IV_HEX_SIZE])
|
|
encrypted_passwd = bytes.fromhex(enc_data[AES_IV_HEX_SIZE:])
|
|
except ValueError as value_error:
|
|
raise CEJError(
|
|
'Non-hexadecimal number found in encrypted CEJ password.'
|
|
) from value_error
|
|
|
|
secrets_json = cls.get_secrets_json(directory=directory)
|
|
encryption_key_hex = secrets_json.get('encryption_key', None)
|
|
if not encryption_key_hex:
|
|
raise CEJError(
|
|
f'Empty "encryption key" found in {secrets_file_path}'
|
|
)
|
|
try:
|
|
encryption_key = bytes.fromhex(encryption_key_hex)
|
|
except ValueError as value_error:
|
|
raise CEJError(
|
|
'Non-hexadecimal number found in encryption key from '
|
|
f'{secrets_file_path} file.'
|
|
) from value_error
|
|
cipher = Cipher(
|
|
algorithms.AES(encryption_key),
|
|
modes.CBC(iv)
|
|
)
|
|
decryptor = cipher.decryptor()
|
|
unpadder = padding.PKCS7(AES_BLOCK_SIZE_BITS).unpadder()
|
|
padded_passwd_bytes = (
|
|
decryptor.update(encrypted_passwd)
|
|
+ decryptor.finalize()
|
|
)
|
|
passwd_bytes = (
|
|
unpadder.update(padded_passwd_bytes) + unpadder.finalize()
|
|
)
|
|
return passwd_bytes.decode()
|
|
|
|
@classmethod
|
|
def encrypt_password(
|
|
cls, passwd: str, directory: str = MCS_DATA_PATH
|
|
) -> str:
|
|
"""Encrypt CEJ password.
|
|
|
|
:param directory: path to the directory with .secrets file
|
|
:type directory: str, optional
|
|
:param passwd: CEJ password
|
|
:type passwd: str
|
|
:return: encrypted CEJ password in uppercase hex format
|
|
:rtype: str
|
|
"""
|
|
|
|
secrets_file_path = os.path.join(directory, MCS_SECRETS_FILENAME)
|
|
iv = os.urandom(AES_IV_BIN_SIZE)
|
|
|
|
secrets_json = cls.get_secrets_json(directory=directory)
|
|
encryption_key_hex = secrets_json.get('encryption_key')
|
|
if not encryption_key_hex:
|
|
raise CEJError(
|
|
f'Empty "encryption key" found in {secrets_file_path}'
|
|
)
|
|
try:
|
|
encryption_key = bytes.fromhex(encryption_key_hex)
|
|
except ValueError as value_error:
|
|
raise CEJError(
|
|
'Non-hexadecimal number found in encryption key from '
|
|
f'{secrets_file_path} file.'
|
|
) from value_error
|
|
cipher = Cipher(
|
|
algorithms.AES(encryption_key),
|
|
modes.CBC(iv)
|
|
)
|
|
|
|
encryptor = cipher.encryptor()
|
|
padder = padding.PKCS7(algorithms.AES.block_size).padder()
|
|
padded_data = padder.update(passwd.encode()) + padder.finalize()
|
|
|
|
encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
|
|
encrypted_passwd_bytes = iv + encrypted_data
|
|
return encrypted_passwd_bytes.hex().upper()
|
|
|
|
@classmethod
|
|
def generate_secrets_data(cls) -> dict:
|
|
"""Generate secrets data for .secrets file.
|
|
|
|
:return: secrets data
|
|
:rtype: dict
|
|
"""
|
|
key_length = 32 # AES256 key_size
|
|
encryption_key = os.urandom(key_length)
|
|
encryption_key_hex = binascii.hexlify(encryption_key).decode()
|
|
secrets_dict = {
|
|
'description': 'Columnstore CrossEngineSupport password encryption/decryption key',
|
|
'encryption_cipher': 'EVP_aes_256_cbc',
|
|
'encryption_key': encryption_key_hex
|
|
}
|
|
|
|
return secrets_dict
|
|
|
|
@classmethod
|
|
def save_secrets(
|
|
cls, secrets: dict, directory: str = MCS_DATA_PATH,
|
|
owner: str = 'mysql'
|
|
) -> None:
|
|
"""Write secrets to .secrets file.
|
|
|
|
:param directory: path to the directory with .secrets file
|
|
:type directory: str, optional
|
|
:param secrets: secrets dict
|
|
:type secrets: dict
|
|
:param filepath: path to the .secrets file
|
|
:type filepath: str, optional
|
|
:param owner: owner of the file
|
|
:type owner: str, optional
|
|
"""
|
|
secrets_file_path = os.path.join(directory, MCS_SECRETS_FILENAME)
|
|
if cls.secretsfile_exists(directory=directory):
|
|
if cls.get_secrets_json(directory=directory) != secrets:
|
|
copyfile(
|
|
secrets_file_path,
|
|
os.path.join(
|
|
directory,
|
|
f'{MCS_SECRETS_FILENAME}.cmapi.save'
|
|
)
|
|
)
|
|
logging.warning(
|
|
f'Backup of {secrets_file_path} file created.'
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f'No changes in {secrets_file_path} file detected.'
|
|
)
|
|
return
|
|
|
|
try:
|
|
with open(
|
|
secrets_file_path, 'w', encoding='utf-8'
|
|
) as secrets_file:
|
|
json.dump(secrets, secrets_file)
|
|
except Exception as exc:
|
|
raise CEJError(f'Write to .secrets file failed.') from exc
|
|
|
|
try:
|
|
os.chmod(secrets_file_path, stat.S_IRUSR)
|
|
userinfo = pwd.getpwnam(owner)
|
|
os.chown(secrets_file_path, userinfo.pw_uid, userinfo.pw_gid)
|
|
logging.debug(
|
|
f'Permissions of {secrets_file_path} file set to {owner}:read.'
|
|
)
|
|
logging.debug(
|
|
f'Ownership of {secrets_file_path} file given to {owner}.'
|
|
)
|
|
except Exception as exc:
|
|
raise CEJError(
|
|
f'Failed to set permissions or ownership for .secrets file.'
|
|
) from exc |