1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-02 17:22:27 +03:00
Files
mariadb-AlanMologorsky a079a2c944 MCOL-5496: Merge CMAPI code to engine repo.
[add] cmapi code to engine
2023-06-07 10:00:16 +03:00

120 lines
4.0 KiB
Python

"""Module contains all things related to working with .secrets file."""
import json
import logging
import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from cmapi_server.constants import MCS_SECRETS_FILE_PATH
from cmapi_server.exceptions import CEJError
AES_BLOCK_SIZE_BITS = algorithms.AES.block_size
AES_IV_BIN_SIZE = int(AES_BLOCK_SIZE_BITS/8)
# two hex chars for each byte
AES_IV_HEX_SIZE = AES_IV_BIN_SIZE * 2
class CEJPasswordHandler():
"""Handler for CrossEngineSupport password decryption."""
@classmethod
def secretsfile_exists(cls):
"""Check the .secrets file in MCS_SECRETS_FILE_PATH.
:return: True if file exists and not empty.
:rtype: bool
"""
try:
if (
os.path.isfile(MCS_SECRETS_FILE_PATH) and
os.path.getsize(MCS_SECRETS_FILE_PATH) > 0
):
return True
except Exception:
# TODO: remove after check if python 3.8 everytime exist
# in package because isfile and getsize not rasing
# exceptions after 3.8
logging.warning(
'Something went wrong while detecting the .secrets file.',
exc_info=True
)
return False
@classmethod
def get_secrets_json(cls):
"""Get json from .secrets file.
:raises CEJError: on empty\corrupted\wrong format .secrets file
:return: json from .secrets file
:rtype: dict
"""
if not cls.secretsfile_exists():
raise CEJError(f'{MCS_SECRETS_FILE_PATH} file does not exist.')
with open(MCS_SECRETS_FILE_PATH) as secrets_file:
try:
secrets_json = json.load(secrets_file)
except Exception:
logging.error(
'Something went wrong while loading json from '
f'{MCS_SECRETS_FILE_PATH}',
exc_info=True
)
raise CEJError(
f'Looks like file {MCS_SECRETS_FILE_PATH} is corrupted or'
'has wrong format.'
) from None
return secrets_json
@classmethod
def decrypt_password(cls, enc_data:str):
"""Decrypt CEJ password if needed.
:param enc_data: encrypted initialization vector + password in hex str
:type enc_data: str
:return: decrypted CEJ password
:rtype: str
"""
if not cls.secretsfile_exists():
logging.warning('Unencrypted CrossEngineSupport password used.')
return enc_data
logging.info('Encrypted CrossEngineSupport password found.')
try:
iv = bytes.fromhex(enc_data[:AES_IV_HEX_SIZE])
encrypted_passwd = bytes.fromhex(enc_data[AES_IV_HEX_SIZE:])
except ValueError as value_error:
raise CEJError(
'Non-hexadecimal number found in encrypted CEJ password.'
) from value_error
secrets_json = cls.get_secrets_json()
encryption_key_hex = secrets_json.get('encryption_key')
if not encryption_key_hex:
raise CEJError(
f'Empty "encryption key" found in {MCS_SECRETS_FILE_PATH}'
)
try:
encryption_key = bytes.fromhex(encryption_key_hex)
except ValueError as value_error:
raise CEJError(
'Non-hexadecimal number found in encryption key from '
f'{MCS_SECRETS_FILE_PATH} file.'
) from value_error
cipher = Cipher(
algorithms.AES(encryption_key),
modes.CBC(iv)
)
decryptor = cipher.decryptor()
unpadder = padding.PKCS7(AES_BLOCK_SIZE_BITS).unpadder()
padded_passwd_bytes = (
decryptor.update(encrypted_passwd)
+ decryptor.finalize()
)
passwd_bytes = (
unpadder.update(padded_passwd_bytes) + unpadder.finalize()
)
return passwd_bytes.decode()