1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-06-09 06:41:19 +03:00
mariadb-AlanMologorsky d17fc7d9be Review fixes.
2025-04-08 14:56:06 +03:00

275 lines
9.6 KiB
Python

"""Module contains all things related to working with .secrets file."""
import binascii
import json
import logging
import os
import pwd
import stat
from shutil import copyfile
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from cmapi_server.constants import MCS_DATA_PATH, MCS_SECRETS_FILENAME
from cmapi_server.exceptions import CEJError
AES_BLOCK_SIZE_BITS = algorithms.AES.block_size
AES_IV_BIN_SIZE = int(AES_BLOCK_SIZE_BITS/8)
# two hex chars for each byte
AES_IV_HEX_SIZE = AES_IV_BIN_SIZE * 2
class CEJPasswordHandler():
"""Handler for CrossEngineSupport password decryption."""
@classmethod
def secretsfile_exists(cls, directory: str = MCS_DATA_PATH) -> bool:
"""Check the .secrets file in directory. Default MCS_SECRETS_FILE_PATH.
:param directory: path to the directory with .secrets file
:type directory: str, optional
:return: True if file exists and not empty.
:rtype: bool
"""
secrets_file_path = os.path.join(directory, MCS_SECRETS_FILENAME)
try:
if (
os.path.isfile(secrets_file_path) and
os.path.getsize(secrets_file_path) > 0
):
return True
except Exception:
# TODO: remove after check if python 3.8 everytime exist
# in package because isfile and getsize not rasing
# exceptions after 3.8
logging.warning(
'Something went wrong while detecting the .secrets file.',
exc_info=True
)
return False
@classmethod
def get_secrets_json(cls, directory: str = MCS_DATA_PATH) -> dict:
"""Get json from .secrets file.
:param directory: path to the directory with .secrets file
:type directory: str, optional
:return: json from .secrets file
:rtype: dict
:raises CEJError: on empty\corrupted\wrong format .secrets file
"""
secrets_file_path = os.path.join(directory, MCS_SECRETS_FILENAME)
if not cls.secretsfile_exists(directory=directory):
raise CEJError(f'{secrets_file_path} file does not exist.')
with open(secrets_file_path) as secrets_file:
try:
secrets_json = json.load(secrets_file)
except Exception:
logging.error(
'Something went wrong while loading json from '
f'{secrets_file_path}',
exc_info=True
)
raise CEJError(
f'Looks like file {secrets_file_path} is corrupted or'
'has wrong format.'
) from None
return secrets_json
@classmethod
def is_password_encrypted(cls, passwd: str) -> bool:
"""Check if password is encrypted.
:param passwd: CEJ password
:type passwd: str
:return: True if password is encrypted
:rtype: bool
"""
# minimal length of encrypted password
if len(passwd) < (AES_IV_HEX_SIZE*2):
return False
try:
# Try converting the string to an integer with base 16
int(passwd, 16)
except ValueError:
return False
return True
@classmethod
def decrypt_password(
cls, enc_data: str, directory: str = MCS_DATA_PATH
) -> str:
"""Decrypt CEJ password if needed.
:param directory: path to the directory with .secrets file
:type directory: str, optional
:param enc_data: encrypted initialization vector + password in hex str
:type enc_data: str
:return: decrypted CEJ password
:rtype: str
"""
secrets_file_path = os.path.join(directory, MCS_SECRETS_FILENAME)
if (
not cls.secretsfile_exists(directory=directory) or
not cls.is_password_encrypted(enc_data)
):
logging.warning('Unencrypted CrossEngineSupport password used.')
return enc_data
logging.info('Encrypted CrossEngineSupport password found.')
try:
iv = bytes.fromhex(enc_data[:AES_IV_HEX_SIZE])
encrypted_passwd = bytes.fromhex(enc_data[AES_IV_HEX_SIZE:])
except ValueError as value_error:
raise CEJError(
'Non-hexadecimal number found in encrypted CEJ password.'
) from value_error
secrets_json = cls.get_secrets_json(directory=directory)
encryption_key_hex = secrets_json.get('encryption_key', None)
if not encryption_key_hex:
raise CEJError(
f'Empty "encryption key" found in {secrets_file_path}'
)
try:
encryption_key = bytes.fromhex(encryption_key_hex)
except ValueError as value_error:
raise CEJError(
'Non-hexadecimal number found in encryption key from '
f'{secrets_file_path} file.'
) from value_error
cipher = Cipher(
algorithms.AES(encryption_key),
modes.CBC(iv)
)
decryptor = cipher.decryptor()
unpadder = padding.PKCS7(AES_BLOCK_SIZE_BITS).unpadder()
padded_passwd_bytes = (
decryptor.update(encrypted_passwd)
+ decryptor.finalize()
)
passwd_bytes = (
unpadder.update(padded_passwd_bytes) + unpadder.finalize()
)
return passwd_bytes.decode()
@classmethod
def encrypt_password(
cls, passwd: str, directory: str = MCS_DATA_PATH
) -> str:
"""Encrypt CEJ password.
:param directory: path to the directory with .secrets file
:type directory: str, optional
:param passwd: CEJ password
:type passwd: str
:return: encrypted CEJ password in uppercase hex format
:rtype: str
"""
secrets_file_path = os.path.join(directory, MCS_SECRETS_FILENAME)
iv = os.urandom(AES_IV_BIN_SIZE)
secrets_json = cls.get_secrets_json(directory=directory)
encryption_key_hex = secrets_json.get('encryption_key')
if not encryption_key_hex:
raise CEJError(
f'Empty "encryption key" found in {secrets_file_path}'
)
try:
encryption_key = bytes.fromhex(encryption_key_hex)
except ValueError as value_error:
raise CEJError(
'Non-hexadecimal number found in encryption key from '
f'{secrets_file_path} file.'
) from value_error
cipher = Cipher(
algorithms.AES(encryption_key),
modes.CBC(iv)
)
encryptor = cipher.encryptor()
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(passwd.encode()) + padder.finalize()
encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
encrypted_passwd_bytes = iv + encrypted_data
return encrypted_passwd_bytes.hex().upper()
@classmethod
def generate_secrets_data(cls) -> dict:
"""Generate secrets data for .secrets file.
:return: secrets data
:rtype: dict
"""
key_length = 32 # AES256 key_size
encryption_key = os.urandom(key_length)
encryption_key_hex = binascii.hexlify(encryption_key).decode()
secrets_dict = {
'description': 'Columnstore CrossEngineSupport password encryption/decryption key',
'encryption_cipher': 'EVP_aes_256_cbc',
'encryption_key': encryption_key_hex
}
return secrets_dict
@classmethod
def save_secrets(
cls, secrets: dict, directory: str = MCS_DATA_PATH,
owner: str = 'mysql'
) -> None:
"""Write secrets to .secrets file.
:param directory: path to the directory with .secrets file
:type directory: str, optional
:param secrets: secrets dict
:type secrets: dict
:param filepath: path to the .secrets file
:type filepath: str, optional
:param owner: owner of the file
:type owner: str, optional
"""
secrets_file_path = os.path.join(directory, MCS_SECRETS_FILENAME)
if cls.secretsfile_exists(directory=directory):
if cls.get_secrets_json(directory=directory) != secrets:
copyfile(
secrets_file_path,
os.path.join(
directory,
f'{MCS_SECRETS_FILENAME}.cmapi.save'
)
)
logging.warning(
f'Backup of {secrets_file_path} file created.'
)
else:
logging.debug(
f'No changes in {secrets_file_path} file detected.'
)
return
try:
with open(
secrets_file_path, 'w', encoding='utf-8'
) as secrets_file:
json.dump(secrets, secrets_file)
except Exception as exc:
raise CEJError(f'Write to .secrets file failed.') from exc
try:
os.chmod(secrets_file_path, stat.S_IRUSR)
userinfo = pwd.getpwnam(owner)
os.chown(secrets_file_path, userinfo.pw_uid, userinfo.pw_gid)
logging.debug(
f'Permissions of {secrets_file_path} file set to {owner}:read.'
)
logging.debug(
f'Ownership of {secrets_file_path} file given to {owner}.'
)
except Exception as exc:
raise CEJError(
f'Failed to set permissions or ownership for .secrets file.'
) from exc