You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			386 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			386 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
import configparser
 | 
						|
import fcntl
 | 
						|
import json
 | 
						|
import glob
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import socket
 | 
						|
import ssl
 | 
						|
import struct
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import time
 | 
						|
from typing import Optional
 | 
						|
import xml.etree.ElementTree as ET
 | 
						|
from urllib.request import Request, urlopen
 | 
						|
from urllib.error import HTTPError, URLError
 | 
						|
 | 
						|
 | 
						|
MCS_SYSCONF_DIR = '@ENGINE_SYSCONFDIR@'
 | 
						|
MCS_ETC_PATH  = os.path.join(MCS_SYSCONF_DIR, 'columnstore')
 | 
						|
CMAPI_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'cmapi_server.conf')
 | 
						|
MCS_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'Columnstore.xml')
 | 
						|
SM_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'storagemanager.cnf')
 | 
						|
MCS_BIN_DIR = '@ENGINE_BINDIR@'
 | 
						|
SAVEBRM = os.path.join(MCS_BIN_DIR, 'save_brm')
 | 
						|
CLEAR_ALL_SHMEM_LOCKS = os.path.join(MCS_BIN_DIR, 'mcs-shmem-locks') 
 | 
						|
CLEAR_ALL_SHMEM_LOCKS_ARGS = '-a'
 | 
						|
EM_FILE_SUFFIX = '_em'
 | 
						|
EM_FILE_SIZE_THRESHOLD = 1000
 | 
						|
FIVE_SECS = 5
 | 
						|
NUMBER_OF_FILES_TO_KEEP = 40
 | 
						|
DEFAULT_EM_LOCAL_PATH_PREFIX = ''
 | 
						|
LOCALHOST = '127.0.0.1'
 | 
						|
# according to https://www.ibm.com/docs/en/storage-sentinel/1.1.2?topic=installation-map-your-local-host-loopback-address
 | 
						|
LOCALHOSTS = (
 | 
						|
    '127.0.0.1',
 | 
						|
    'localhost', 'localhost.localdomain',
 | 
						|
    'localhost4', 'localhost4.localdomain4',
 | 
						|
    '::1',
 | 
						|
    'localhost6', 'localhost6.localdomain6',
 | 
						|
)
 | 
						|
API_VERSION = '0.4.0'
 | 
						|
API_PORT = '8640'
 | 
						|
BRM_BACKUP_PATH = '/tmp/columnstore_tmp_files/rdwrscratch/'
 | 
						|
BRM_BACKUP_PATH_PART = '{}_BRM_saves'
 | 
						|
 | 
						|
 | 
						|
def get_api_key():
 | 
						|
    """Get API key from cmapi config file.
 | 
						|
 | 
						|
    :return: return api key
 | 
						|
    :rtype: str
 | 
						|
    """
 | 
						|
    cmapi_config = configparser.ConfigParser()
 | 
						|
    cmapi_config.read(CMAPI_CONFIG_PATH)
 | 
						|
    # dequote?
 | 
						|
    return cmapi_config.get('Authentication', 'x-api-key', fallback='')
 | 
						|
 | 
						|
 | 
						|
def get_unverified_context():
 | 
						|
    ctx = ssl.create_default_context()
 | 
						|
    ctx.check_hostname = False
 | 
						|
    ctx.verify_mode = ssl.CERT_NONE
 | 
						|
    return ctx
 | 
						|
 | 
						|
 | 
						|
def cmapi_available():
 | 
						|
    """Check if CMAPI server is running.
 | 
						|
 | 
						|
    :return: is CMAPI running or not
 | 
						|
    :rtype: bool
 | 
						|
    """
 | 
						|
    logging.info('Detecting CMAPI is up and running.')
 | 
						|
    url = 'https://{}:{}/notfound'.format(LOCALHOST, API_PORT)
 | 
						|
    request = Request(method='POST', url=url)
 | 
						|
    ctx = get_unverified_context()
 | 
						|
    try:
 | 
						|
        with urlopen(request, context=ctx, timeout=FIVE_SECS) as req:
 | 
						|
            _ = req.read().decode('utf-8')
 | 
						|
    except HTTPError as exc:
 | 
						|
        if exc.code == 404:
 | 
						|
            return True
 | 
						|
    except URLError:
 | 
						|
        logging.info('CMAPI seems to be unavailable.')
 | 
						|
    except Exception:
 | 
						|
        logging.error(
 | 
						|
            'Undefined error while trying to check CMAPI availabale.',
 | 
						|
            exc_info=True
 | 
						|
        )
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def get_ip_address_by_nic(ifname):
 | 
						|
    """Get ip address for nic.
 | 
						|
 | 
						|
    :param ifname: network intarface name
 | 
						|
    :type ifname: str
 | 
						|
    :return: ip address
 | 
						|
    :rtype: str
 | 
						|
    """
 | 
						|
    # doesn't work on windows,
 | 
						|
    # OpenBSD and probably doesn't on FreeBSD/pfSense either
 | 
						|
    ip_addr = ''
 | 
						|
    try:
 | 
						|
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | 
						|
        ip_addr = socket.inet_ntoa(
 | 
						|
            fcntl.ioctl(
 | 
						|
                s.fileno(),
 | 
						|
                0x8915,  # SIOCGIFADDR "socket ioctl get interface address"
 | 
						|
                struct.pack('256s', bytes(ifname[:15], 'utf-8'))
 | 
						|
            )[20:24]
 | 
						|
        )
 | 
						|
    except Exception as exc:
 | 
						|
        logging.debug(
 | 
						|
            'Exception while getting IP address of an "{}" interface'.format(
 | 
						|
                ifname
 | 
						|
            ),
 | 
						|
            exc_info=True
 | 
						|
        )
 | 
						|
    finally:
 | 
						|
        s.close()
 | 
						|
    return ip_addr
 | 
						|
 | 
						|
 | 
						|
def is_primary_fallback(current_hostname):
 | 
						|
    """Detect is node primary or not without cmapi.
 | 
						|
 | 
						|
    :param current_hostname: hostname or FQDN
 | 
						|
    :type current_hostname: str
 | 
						|
    :return: is node primary
 | 
						|
    :rtype: bool
 | 
						|
    """
 | 
						|
    logging.info(
 | 
						|
        'Current DBRM_Controller/IPAddr is {}'.format(current_hostname)
 | 
						|
    )
 | 
						|
    hostnames = set()
 | 
						|
    for _, nic_name in socket.if_nameindex():
 | 
						|
        ip_addr = get_ip_address_by_nic(nic_name)
 | 
						|
        try:
 | 
						|
            hostnames_3tuple = socket.gethostbyaddr(ip_addr)
 | 
						|
            hostnames.update([hostnames_3tuple[0], *hostnames_3tuple[1]])
 | 
						|
        except:
 | 
						|
            pass
 | 
						|
    logging.info('Found hostnames {}.'.format(hostnames))
 | 
						|
    return current_hostname in LOCALHOSTS or current_hostname in hostnames
 | 
						|
 | 
						|
 | 
						|
def is_node_primary(conf_root):
 | 
						|
    """Detect is current node primary or not.
 | 
						|
 | 
						|
    :param conf_root: xml config root element
 | 
						|
    :type conf_root: xml.etree.ElementTree.ElementTree
 | 
						|
    :return: primary or not
 | 
						|
    :rtype: bool
 | 
						|
    """
 | 
						|
    if cmapi_available():
 | 
						|
        url = 'https://{}:{}/cmapi/{}/node/primary'.format(
 | 
						|
            LOCALHOST, API_PORT, API_VERSION
 | 
						|
        )
 | 
						|
        ctx = get_unverified_context()
 | 
						|
        # Actually for this endpoint no need to provide api key cause it's
 | 
						|
        # not required
 | 
						|
        request = Request(
 | 
						|
            method='GET', url=url, headers={'x-api-key': get_api_key()}
 | 
						|
        )
 | 
						|
 | 
						|
        success = False
 | 
						|
        try:
 | 
						|
            with urlopen(request, context=ctx, timeout=FIVE_SECS) as req:
 | 
						|
                response = req.read()
 | 
						|
            success = True
 | 
						|
        except HTTPError as exc:
 | 
						|
            logging.warning(
 | 
						|
                'Something goes wrong while requesting primary status through api. Got response code "{}" with reason "{}".'.format(
 | 
						|
                    exc.code, exc.reason
 | 
						|
                )
 | 
						|
            )
 | 
						|
        except URLError:
 | 
						|
            logging.warning(
 | 
						|
                'CMAPI became unavailable while trying to request primary status.'
 | 
						|
            )
 | 
						|
        except Exception:
 | 
						|
            logging.error(
 | 
						|
                'Undefined exception while trying to request primary status.',
 | 
						|
                exc_info=True
 | 
						|
            )
 | 
						|
 | 
						|
        if success:
 | 
						|
            dict_response = json.loads(response.decode('utf-8'))
 | 
						|
            is_primary = dict_response.get('is_primary', False)
 | 
						|
            if is_primary and is_primary in ('True', 'true'):
 | 
						|
                is_primary = True
 | 
						|
            else:
 | 
						|
                is_primary = False
 | 
						|
            return is_primary
 | 
						|
 | 
						|
    logging.info('Trying to detect primary without cmapi running.')
 | 
						|
    # text in tag have to be hostname or FQDN
 | 
						|
    return is_primary_fallback(conf_root.find('./DBRM_Controller/IPAddr').text)
 | 
						|
 | 
						|
 | 
						|
def get_file_size(file_path):
 | 
						|
    """ Returns the size of the file in bytes. """
 | 
						|
    try:
 | 
						|
        size = os.path.getsize(file_path)
 | 
						|
        return size
 | 
						|
    except OSError as e:
 | 
						|
        logging.error('OSError in get_file_size(): {}.'.format(e))
 | 
						|
        return None
 | 
						|
    
 | 
						|
 | 
						|
def em_is_empty(file_path_prefix):
 | 
						|
    """Returns True if EM file size is less than EM_FILE_SIZE_THRESHOLD 
 | 
						|
        or its argument is None.
 | 
						|
 | 
						|
    :rtype: Bool
 | 
						|
    """
 | 
						|
    # Add error message if EM is empty
 | 
						|
    if file_path_prefix is None:
 | 
						|
        is_em_empty = True
 | 
						|
    else:
 | 
						|
        filesize = get_file_size(file_path_prefix + EM_FILE_SUFFIX)
 | 
						|
        is_em_empty = filesize < EM_FILE_SIZE_THRESHOLD
 | 
						|
    if is_em_empty:
 | 
						|
        logging.error('EM file is none or its size {} is less than {} bytes.'.format(filesize, EM_FILE_SIZE_THRESHOLD))
 | 
						|
    return is_em_empty
 | 
						|
 | 
						|
 | 
						|
def clean_up_backup_brm_files(save_brm_dir_path):
 | 
						|
    """ Removes all but 5 last usable sets of BRM files in the specified directory.
 | 
						|
        Usable in the context means having non-empty EM.
 | 
						|
    """
 | 
						|
    filenames = os.listdir(save_brm_dir_path)
 | 
						|
    filenames.sort(reverse=True)
 | 
						|
    files_to_remove = filenames[NUMBER_OF_FILES_TO_KEEP:]
 | 
						|
    for filename in files_to_remove:
 | 
						|
        file_path = os.path.join(save_brm_dir_path, filename)
 | 
						|
        logging.info('Clean up {}.'.format(file_path))
 | 
						|
        try:
 | 
						|
            os.remove(file_path)
 | 
						|
        except OSError as e:
 | 
						|
            logging.error('OSError exception happens removing {}: {}.'.format(file_path, e))
 | 
						|
 | 
						|
 | 
						|
def remove_files_by_prefix_if_exist(file_path_prefix):
 | 
						|
    """ Removes files with the given prefix if they exist. """
 | 
						|
    if file_path_prefix is None:
 | 
						|
        logging.error(
 | 
						|
            'file_path_prefix is None. Cannot remove files.',
 | 
						|
            exc_info=True
 | 
						|
        )
 | 
						|
        return
 | 
						|
    try:
 | 
						|
 | 
						|
        files_paths = glob.glob(file_path_prefix + '*')
 | 
						|
        for file_path in files_paths:
 | 
						|
            os.remove(file_path)
 | 
						|
    except OSError as e:
 | 
						|
        logging.error(
 | 
						|
            'Error removing file: {} - {}'.format(file_path, e.strerror),
 | 
						|
            exc_info=True
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def get_config_root_from_file(file_path):
 | 
						|
    """Returns XML root element from file.
 | 
						|
 | 
						|
    :param file_path: xml config path
 | 
						|
    :return: XML root element or None
 | 
						|
    :rtype: Element or None
 | 
						|
    """
 | 
						|
    try:
 | 
						|
        cs_config = ET.parse(file_path)
 | 
						|
        return cs_config.getroot()
 | 
						|
    except (FileNotFoundError, AttributeError, ValueError):
 | 
						|
        logging.error(
 | 
						|
            'Exception while loading Columnstore.xml. Continue anyway.',
 | 
						|
            exc_info=True
 | 
						|
        )
 | 
						|
    return None
 | 
						|
 | 
						|
def get_epoch_prefix():
 | 
						|
    """Returns a prefix with epoch time
 | 
						|
 | 
						|
    :rtype: String
 | 
						|
    """
 | 
						|
    epoch_time = int(time.time())
 | 
						|
    
 | 
						|
    return 'backup_{}'.format(epoch_time)
 | 
						|
 | 
						|
 | 
						|
def get_save_brm_dir_path(a_mcs_config_root):
 | 
						|
    """Returns a path that SM treats as local
 | 
						|
 | 
						|
    :param file_path: xml config XML root
 | 
						|
    :rtype
 | 
						|
    """
 | 
						|
    save_brm_dir_path = BRM_BACKUP_PATH
 | 
						|
    if a_mcs_config_root is not None:
 | 
						|
        try:
 | 
						|
            system_temp_file_dir = a_mcs_config_root.find('./SystemConfig/SystemTempFileDir').text
 | 
						|
            hdfs_rdwr_scratch = a_mcs_config_root.find('./SystemConfig/hdfsRdwrScratch').text
 | 
						|
            # There is a danger to have no '/' in the end of system_temp_file_dir
 | 
						|
            # or have two of them there. In both cases save_brm will fail to store
 | 
						|
            # files locally.
 | 
						|
            save_brm_dir_path = system_temp_file_dir + hdfs_rdwr_scratch
 | 
						|
        except AttributeError:
 | 
						|
            logging.error('Exception while getting SystemTempFileDir and hdfsRdwrScratch from Columnstore.xml', exc_info=True)
 | 
						|
    
 | 
						|
    return save_brm_dir_path
 | 
						|
 | 
						|
 | 
						|
def get_save_brm_path_prefix(a_mcs_config_root):
 | 
						|
    """Returns a path that SM treats as local
 | 
						|
 | 
						|
    :param file_path: xml config XML root
 | 
						|
    :rtype: String
 | 
						|
    """
 | 
						|
    epoch_prefix = get_epoch_prefix()
 | 
						|
    return get_save_brm_dir_path(a_mcs_config_root) + '/' + BRM_BACKUP_PATH_PART.format(epoch_prefix)
 | 
						|
 | 
						|
 | 
						|
def call_executable_with_params(executable: str, args: str) -> bool:
 | 
						|
    """Calls executable and return optional result
 | 
						|
 | 
						|
    :param executable: executable to call
 | 
						|
    :rtype: Optional[str]
 | 
						|
    """
 | 
						|
    executable_w_args: str = executable + ' ' + args
 | 
						|
    try:
 | 
						|
        subprocess.check_call(executable_w_args, shell=True)
 | 
						|
    except subprocess.CalledProcessError as exc:
 | 
						|
        logging.error('The call to {} exits with {}.'.format(executable, exc.returncode))
 | 
						|
        return False
 | 
						|
    except OSError:
 | 
						|
        logging.error('Os error while calling {}.'.format(executable), exc_info=True)
 | 
						|
        return False
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def clear_shmem_locks() -> bool:
 | 
						|
    """Clears shmem locks before save_brm call 
 | 
						|
 | 
						|
    :rtype: Optional[str]
 | 
						|
    """
 | 
						|
    logging.info('Clear shmem read locks.')
 | 
						|
    return call_executable_with_params(CLEAR_ALL_SHMEM_LOCKS, CLEAR_ALL_SHMEM_LOCKS_ARGS)
 | 
						|
 | 
						|
 | 
						|
def call_save_brm(path) -> bool:
 | 
						|
    """Calls save_brm with a path. 
 | 
						|
 | 
						|
    :param path: path to save_brm
 | 
						|
    :rtype: Optional[str]
 | 
						|
    """
 | 
						|
    return call_executable_with_params(SAVEBRM, path)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    # Configure logging to show INFO level messages
 | 
						|
    logging.basicConfig(level=logging.INFO, format='%(levelname)s:%(name)s:%(message)s')
 | 
						|
    
 | 
						|
    mcs_config_root = get_config_root_from_file(MCS_CONFIG_PATH)
 | 
						|
    if clear_shmem_locks() is None:
 | 
						|
        logging.error('Exiting with error cleaning locks.')
 | 
						|
        sys.exit(1)
 | 
						|
    em_local_path_prefix = get_save_brm_path_prefix(mcs_config_root)
 | 
						|
    # config_root can be None
 | 
						|
    if is_node_primary(mcs_config_root):
 | 
						|
        if not call_save_brm(em_local_path_prefix) or em_is_empty(em_local_path_prefix):
 | 
						|
            logging.error('Exiting with error trying to safe BRM locally on primary node.')
 | 
						|
            sys.exit(1)
 | 
						|
        
 | 
						|
        clean_up_backup_brm_files(get_save_brm_dir_path(mcs_config_root))
 | 
						|
 | 
						|
        call_save_brm(DEFAULT_EM_LOCAL_PATH_PREFIX)
 | 
						|
    else:
 | 
						|
        # Node is not primary. Call save_brm locally to save a copy of BRM localy
 | 
						|
        logging.error('Node is not primary. Call save_brm locally')
 | 
						|
        if not call_save_brm(em_local_path_prefix) or em_is_empty(em_local_path_prefix):
 | 
						|
            logging.error('Exiting with error trying to safe BRM locally on non-primary node.')
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
    sys.exit(0)
 |