You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	* Minor fix in timeout for /notfound "endpoint" call. - [fix] timeout to urlopen /notfound "endpoint" in mcs-savebrm and mcs-loadbrm * chore(CMAPI): reduced a second Time Out asking MDB for a async replication status. The latency must be small b/c MDB Server to be requested is local. --------- Co-authored-by: drrtuy <drrtuy@gmail.com>
		
			
				
	
	
		
			456 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			456 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
"""
 | 
						|
Works with python 3.4 and higher.
 | 
						|
"""
 | 
						|
import configparser
 | 
						|
import fcntl
 | 
						|
import glob
 | 
						|
import json
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import shutil
 | 
						|
import socket
 | 
						|
import ssl
 | 
						|
import struct
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import xml.etree.ElementTree as ET
 | 
						|
from pathlib import Path
 | 
						|
from urllib.request import Request, urlopen
 | 
						|
from urllib.error import HTTPError, URLError
 | 
						|
 | 
						|
 | 
						|
MCS_SYSCONF_DIR = '@ENGINE_SYSCONFDIR@'
 | 
						|
MCS_ETC_PATH  = os.path.join(MCS_SYSCONF_DIR, 'columnstore')
 | 
						|
CMAPI_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'cmapi_server.conf')
 | 
						|
MCS_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'Columnstore.xml')
 | 
						|
SM_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'storagemanager.cnf')
 | 
						|
BYPASS_SM_PATH = '/tmp/columnstore_tmp_files/rdwrscratch/BRM_saves'
 | 
						|
COLUMNSTORE_TMP_DIR = '/tmp/columnstore_tmp_files'
 | 
						|
MCS_DATA_PATH = '@ENGINE_DATADIR@'
 | 
						|
MCS_MODULE_FILE_PATH = os.path.join(MCS_DATA_PATH, 'local/module')
 | 
						|
MCS_BIN_DIR = '@ENGINE_BINDIR@'
 | 
						|
SMCAT = os.path.join(MCS_BIN_DIR, 'smcat')
 | 
						|
LOADBRM = os.path.join(MCS_BIN_DIR, 'load_brm')
 | 
						|
S3_DBROOT1_BRM_PATH = 'data1/systemFiles/dbrm/BRM_saves_current'
 | 
						|
USER = '@DEFAULT_USER@'
 | 
						|
GROUP = '@DEFAULT_GROUP@'
 | 
						|
MINUTE = 60
 | 
						|
FIVE_SECS = 5
 | 
						|
UNREASONABLE_DELAY = 600
 | 
						|
LOCALHOST = '127.0.0.1'
 | 
						|
API_VERSION = '0.4.0'
 | 
						|
API_PORT = '8640'
 | 
						|
 | 
						|
 | 
						|
def get_api_key():
 | 
						|
    """Get API key from cmapi config file.
 | 
						|
 | 
						|
    :return: return api key
 | 
						|
    :rtype: str
 | 
						|
    """
 | 
						|
    cmapi_config = configparser.ConfigParser()
 | 
						|
    cmapi_config.read(CMAPI_CONFIG_PATH)
 | 
						|
    # dequote?
 | 
						|
    return cmapi_config.get('Authentication', 'x-api-key', fallback='')
 | 
						|
 | 
						|
 | 
						|
def get_unverified_context():
 | 
						|
    ctx = ssl.create_default_context()
 | 
						|
    ctx.check_hostname = False
 | 
						|
    ctx.verify_mode = ssl.CERT_NONE
 | 
						|
    return ctx
 | 
						|
 | 
						|
 | 
						|
def cmapi_available(host=LOCALHOST):
 | 
						|
    """Check if CMAPI server is running.
 | 
						|
 | 
						|
    :param host: host address to check
 | 
						|
    :type host: str
 | 
						|
    :return: is CMAPI running or not
 | 
						|
    :rtype: bool
 | 
						|
    """
 | 
						|
    logging.debug('Detecting CMAPI is up and running on {}.'.format(host))
 | 
						|
    url = 'https://{}:{}/notfound'.format(host, API_PORT)
 | 
						|
    request = Request(method='POST', url=url)
 | 
						|
    ctx = get_unverified_context()
 | 
						|
    try:
 | 
						|
        with urlopen(request, context=ctx, timeout=FIVE_SECS) as req:
 | 
						|
            _ = req.read().decode('utf-8')
 | 
						|
    except HTTPError as exc:
 | 
						|
        if exc.code == 404:
 | 
						|
            return True
 | 
						|
    except URLError:
 | 
						|
        logging.info('CMAPI seems to be unavailable.')
 | 
						|
    except Exception:
 | 
						|
        logging.error(
 | 
						|
            'Undefined error while trying to check CMAPI availabale.',
 | 
						|
            exc_info=True
 | 
						|
        )
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def get_ip_address_by_nic(ifname):
 | 
						|
    """Get ip address for nic.
 | 
						|
 | 
						|
    :param ifname: network intarface name
 | 
						|
    :type ifname: str
 | 
						|
    :return: ip address
 | 
						|
    :rtype: str
 | 
						|
    """
 | 
						|
    # doesn't work on windows,
 | 
						|
    # OpenBSD and probably doesn't on FreeBSD/pfSense either
 | 
						|
    ip_addr = ''
 | 
						|
    try:
 | 
						|
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | 
						|
        ip_addr = socket.inet_ntoa(
 | 
						|
            fcntl.ioctl(
 | 
						|
                s.fileno(),
 | 
						|
                0x8915,  # SIOCGIFADDR "socket ioctl get interface address"
 | 
						|
                struct.pack('256s', bytes(ifname[:15], 'utf-8'))
 | 
						|
            )[20:24]
 | 
						|
        )
 | 
						|
    except Exception:
 | 
						|
        logging.debug(
 | 
						|
            'Exception while getting IP address of an "{}" interface'.format(
 | 
						|
                ifname
 | 
						|
            ),
 | 
						|
            exc_info=True
 | 
						|
        )
 | 
						|
    finally:
 | 
						|
        s.close()
 | 
						|
    return ip_addr
 | 
						|
 | 
						|
 | 
						|
def is_primary_fallback(current_hostname):
 | 
						|
    """Detect is node primary or not without cmapi.
 | 
						|
 | 
						|
    :param current_hostname: hostname or FQDN
 | 
						|
    :type current_hostname: str
 | 
						|
    :return: is node primary
 | 
						|
    :rtype: bool
 | 
						|
    """
 | 
						|
    logging.debug(
 | 
						|
        'Current DBRM_Controller/IPAddr is {}'.format(current_hostname)
 | 
						|
    )
 | 
						|
    hostnames = set()
 | 
						|
    for _, nic_name in socket.if_nameindex():
 | 
						|
        ip_addr = get_ip_address_by_nic(nic_name)
 | 
						|
        if ip_addr:
 | 
						|
            hostnames_3tuple = socket.gethostbyaddr(ip_addr)
 | 
						|
            hostnames.update([hostnames_3tuple[0], *hostnames_3tuple[1]])
 | 
						|
    logging.debug('Found hostnames {}.'.format(hostnames))
 | 
						|
    return current_hostname in hostnames
 | 
						|
 | 
						|
 | 
						|
def is_node_primary(conf_root):
 | 
						|
    """Detect is current node primary or not.
 | 
						|
 | 
						|
    :param conf_root: xml config root element
 | 
						|
    :type conf_root: xml.etree.ElementTree.ElementTree
 | 
						|
    :return: primary or not
 | 
						|
    :rtype: bool
 | 
						|
    """
 | 
						|
    if cmapi_available():
 | 
						|
        url = 'https://{}:{}/cmapi/{}/node/primary'.format(
 | 
						|
            LOCALHOST, API_PORT, API_VERSION
 | 
						|
        )
 | 
						|
        ctx = get_unverified_context()
 | 
						|
        # Actually for this endpoint no need to provide api key cause it's
 | 
						|
        # not required
 | 
						|
        request = Request(
 | 
						|
            method='GET', url=url, headers={'x-api-key': get_api_key()}
 | 
						|
        )
 | 
						|
 | 
						|
        success = False
 | 
						|
        try:
 | 
						|
            with urlopen(request, context=ctx, timeout=FIVE_SECS) as req:
 | 
						|
                response = req.read()
 | 
						|
            success = True
 | 
						|
        except HTTPError as exc:
 | 
						|
            logging.warning(
 | 
						|
                'Something goes wrong while requesting primary status '
 | 
						|
                'through api. '
 | 
						|
                f'Got response code "{exc.code}" with reason "{exc.reason}".'
 | 
						|
            )
 | 
						|
        except URLError:
 | 
						|
            logging.warning(
 | 
						|
                'CMAPI became unavailable while trying '
 | 
						|
                'to request primary status.'
 | 
						|
            )
 | 
						|
        except Exception:
 | 
						|
            logging.error(
 | 
						|
                'Undefined exception while trying to request primary status.',
 | 
						|
                exc_info=True
 | 
						|
            )
 | 
						|
 | 
						|
        if success:
 | 
						|
            dict_response = json.loads(response.decode('utf-8'))
 | 
						|
            is_primary = dict_response.get('is_primary', False)
 | 
						|
            if is_primary and is_primary in ('True', 'true'):
 | 
						|
                is_primary = True
 | 
						|
            else:
 | 
						|
                is_primary = False
 | 
						|
            return is_primary
 | 
						|
 | 
						|
    logging.info('Trying to detect primary without cmapi running.')
 | 
						|
    # text in tag have to be hostname or FQDN
 | 
						|
    return is_primary_fallback(conf_root.find('./DBRM_Controller/IPAddr').text)
 | 
						|
 | 
						|
 | 
						|
def get_meta(conf_root, meta_type):
 | 
						|
    """Get meta from primary.
 | 
						|
 | 
						|
    :param conf_root: xml config root element
 | 
						|
    :type conf_root: xml.etree.ElementTree.ElementTree
 | 
						|
    :param meta_type: meta name for endpoint call
 | 
						|
    :type meta_type: str
 | 
						|
    :return: meta data
 | 
						|
    :rtype: bytes
 | 
						|
    """
 | 
						|
    logging.info('Pulling {} from the primary node.'.format(meta_type))
 | 
						|
    primary_address = conf_root.find('./DBRM_Controller/IPAddr').text
 | 
						|
    url = 'https://{}:{}/cmapi/{}/node/meta/{}'.format(
 | 
						|
        primary_address, API_PORT, API_VERSION, meta_type
 | 
						|
    )
 | 
						|
 | 
						|
    api_key = get_api_key()
 | 
						|
    if len(api_key) == 0:
 | 
						|
        logging.error(
 | 
						|
            'Failed to find API key in {}.'.format(CMAPI_CONFIG_PATH)
 | 
						|
        )
 | 
						|
        sys.exit(1)
 | 
						|
    headers = {'x-api-key': api_key}
 | 
						|
    ctx = get_unverified_context()
 | 
						|
    request = Request(method='GET', url=url, headers=headers)
 | 
						|
    try:
 | 
						|
        with urlopen(request, context=ctx, timeout=MINUTE) as req:
 | 
						|
            response = req.read()
 | 
						|
    except HTTPError:
 | 
						|
        logging.error(
 | 
						|
            'Error requesting {} from the primary node.'.format(meta_type)
 | 
						|
        )
 | 
						|
        raise
 | 
						|
    except URLError:
 | 
						|
        logging.warning(
 | 
						|
            'CMAPI on primary became unavailable while trying '
 | 
						|
            'to request {} from it.'.format(meta_type)
 | 
						|
        )
 | 
						|
        raise
 | 
						|
    except Exception:
 | 
						|
        logging.error(
 | 
						|
            'Undefined exception while trying to request {}.'.format(
 | 
						|
                meta_type
 | 
						|
            ),
 | 
						|
            exc_info=True
 | 
						|
        )
 | 
						|
        raise
 | 
						|
    # return content in bytes
 | 
						|
    return response
 | 
						|
 | 
						|
def read_from_sm_with_retry(brm_path):
 | 
						|
    func_name = 'read_from_sm_with_retry'
 | 
						|
    result = ''
 | 
						|
    retry_count = 0
 | 
						|
    success = False
 | 
						|
    args = [SMCAT, brm_path]
 | 
						|
    # We need to wait until SM at primary gets an ownership for dbroot1.
 | 
						|
    while not success and retry_count < UNREASONABLE_DELAY:
 | 
						|
        ret = subprocess.run(args, stdout=subprocess.PIPE)
 | 
						|
        if ret.returncode == 0:
 | 
						|
            return ret.stdout
 | 
						|
        else:
 | 
						|
            logging.error(
 | 
						|
                '{} got error code {} from smcat, retrying'.format(
 | 
						|
                    func_name, ret.returncode
 | 
						|
                )
 | 
						|
            )
 | 
						|
            time.sleep(1)
 | 
						|
            retry_count += 1
 | 
						|
            continue
 | 
						|
    if not success:
 | 
						|
        logging.error('Can not read {} using {}.'.format(brm_path, SMCAT))
 | 
						|
    return result
 | 
						|
 | 
						|
 | 
						|
def read_from_disk(brm_path):
 | 
						|
    try:
 | 
						|
        return subprocess.check_output(['cat', brm_path])
 | 
						|
    except subprocess.CalledProcessError:
 | 
						|
        # will happen when brm file does not exist
 | 
						|
        logging.error('{} does not exist.'.format(brm_path))
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    logging.basicConfig(
 | 
						|
        format='%(levelname)s: %(message)s', level=logging.DEBUG
 | 
						|
    )
 | 
						|
    # To avoid systemd in container environment
 | 
						|
    use_systemd = True
 | 
						|
    if len(sys.argv) > 1:
 | 
						|
        use_systemd = sys.argv[1] != 'no'
 | 
						|
 | 
						|
    sm_config = configparser.ConfigParser()
 | 
						|
    sm_config.read(SM_CONFIG_PATH)
 | 
						|
    storage = sm_config.get(
 | 
						|
        'ObjectStorage', 'service', fallback='LocalStorage'
 | 
						|
    )
 | 
						|
    bucket = sm_config.get('S3', 'bucket', fallback='some_bucket')
 | 
						|
    s3_enabled = storage.lower() == 's3' and bucket.lower() != 'some_bucket'
 | 
						|
 | 
						|
    cs_config = ET.parse(MCS_CONFIG_PATH)
 | 
						|
    config_root = cs_config.getroot()
 | 
						|
    dbrmroot = config_root.find('./SystemConfig/DBRMRoot').text
 | 
						|
    pmCount = int(config_root.find('./SystemModuleConfig/ModuleCount3').text)
 | 
						|
    is_multinode = pmCount > 1
 | 
						|
    brm_saves_current = ''
 | 
						|
 | 
						|
    if s3_enabled:
 | 
						|
        # start SM using systemd
 | 
						|
        if use_systemd:
 | 
						|
            CMD = 'systemctl start mcs-storagemanager'
 | 
						|
            try:
 | 
						|
                subprocess.check_call(CMD, shell=True)
 | 
						|
            except subprocess.CalledProcessError as exc:
 | 
						|
                logging.error(
 | 
						|
                    'Failed to start storagemanager. '
 | 
						|
                    f'Command "{cmd}" exits with {exc.returncode}.'
 | 
						|
                )
 | 
						|
                sys.exit(1)
 | 
						|
            time.sleep(1)   # allow SM time to init
 | 
						|
 | 
						|
        # Adding S3 related configuration into Columnstore.xml
 | 
						|
        config_root.find(
 | 
						|
            './Installation/DBRootStorageType'
 | 
						|
        ).text = 'StorageManager'
 | 
						|
        config_root.find('./StorageManager/Enabled').text = 'Y'
 | 
						|
 | 
						|
        if config_root.find('./SystemConfig/DataFilePlugin') is None:
 | 
						|
            config_root.find('./SystemConfig').append(
 | 
						|
                ET.Element('DataFilePlugin')
 | 
						|
            )
 | 
						|
 | 
						|
        config_root.find('./SystemConfig/DataFilePlugin').text = 'libcloudio.so'
 | 
						|
 | 
						|
        temp_mcs_conf_path = '{}.loadbrm'.format(MCS_CONFIG_PATH)
 | 
						|
        cs_config.write(temp_mcs_conf_path)
 | 
						|
        os.replace(
 | 
						|
            '{}.loadbrm'.format(MCS_CONFIG_PATH),
 | 
						|
            MCS_CONFIG_PATH
 | 
						|
        )    # atomic replacement
 | 
						|
 | 
						|
    # Single-node on S3
 | 
						|
    if s3_enabled and not is_multinode:
 | 
						|
        try:
 | 
						|
            if use_systemd:
 | 
						|
                args = [
 | 
						|
                    'su', '-s',
 | 
						|
                    '/bin/sh', '-c',
 | 
						|
                    '{} {}'.format(SMCAT, S3_DBROOT1_BRM_PATH), USER
 | 
						|
                ]
 | 
						|
            else:
 | 
						|
                args = [SMCAT, S3_DBROOT1_BRM_PATH]
 | 
						|
 | 
						|
            brm_saves_current = subprocess.check_output(args)
 | 
						|
        except subprocess.CalledProcessError as e:
 | 
						|
            # will happen when brm file does not exist
 | 
						|
            logging.error(
 | 
						|
                '{} does not exist.'.format(S3_DBROOT1_BRM_PATH)
 | 
						|
            )
 | 
						|
    else:
 | 
						|
        brm = '{}_current'.format(dbrmroot)
 | 
						|
        if is_multinode:
 | 
						|
            is_primary = False
 | 
						|
            try:
 | 
						|
                is_primary = is_node_primary(config_root)
 | 
						|
                primary_address = config_root.find(
 | 
						|
                    './DBRM_Controller/IPAddr'
 | 
						|
                ).text
 | 
						|
                # Download BRM files from the primary node via CMAPI.
 | 
						|
                if not is_primary:
 | 
						|
                    if cmapi_available(primary_address):
 | 
						|
                        meta_elems = ['em', 'journal', 'vbbm', 'vss']
 | 
						|
                        for meta_type in meta_elems:
 | 
						|
                            meta_data = get_meta(config_root, meta_type)
 | 
						|
                            # Store BRM files locally to load them up
 | 
						|
                            dbrmroot = BYPASS_SM_PATH
 | 
						|
                            if not os.path.exists(dbrmroot):
 | 
						|
                                os.makedirs(dbrmroot)
 | 
						|
                                if use_systemd:
 | 
						|
                                    os.system('chown -R {}:{} {}'.format(USER, GROUP, COLUMNSTORE_TMP_DIR))
 | 
						|
                                    shutil.chown(dbrmroot, USER, GROUP)
 | 
						|
 | 
						|
                            current_name = '{}_{}'.format(dbrmroot, meta_type)
 | 
						|
 | 
						|
                            logging.info(
 | 
						|
                                'Saving {} to {}'.format(
 | 
						|
                                    meta_type, current_name
 | 
						|
                                )
 | 
						|
                            )
 | 
						|
                            path = Path(current_name)
 | 
						|
                            path.write_bytes(meta_data)
 | 
						|
                            shutil.chown(current_name, USER, GROUP)
 | 
						|
                        else:
 | 
						|
                            logging.info(
 | 
						|
                                'Cmapi is not running on primary node. Skip loading metafiles.'
 | 
						|
                            )
 | 
						|
            except Exception as exc:
 | 
						|
                logging.error(
 | 
						|
                    'Failed to detect primary or load BRM data from'
 | 
						|
                    'the primary node.',
 | 
						|
                    exc_info=True
 | 
						|
                )
 | 
						|
                sys.exit(1)
 | 
						|
            # Primary reads BRM files directly from S3 via SM or from disk.
 | 
						|
            if is_primary:
 | 
						|
                if s3_enabled:
 | 
						|
                    # S3 path are relative to the current top level dir,
 | 
						|
                    # e.g. /var/lib/columnstore/data1 becomes /data1
 | 
						|
                    brm_saves_current = read_from_sm_with_retry(
 | 
						|
                        S3_DBROOT1_BRM_PATH
 | 
						|
                    )
 | 
						|
                else:
 | 
						|
                    brm_saves_current = read_from_disk(brm)
 | 
						|
            else:
 | 
						|
                brm_saves_current = b'BRM_saves\n'
 | 
						|
        else:
 | 
						|
            brm_saves_current = read_from_disk(brm)
 | 
						|
 | 
						|
    if brm_saves_current:
 | 
						|
        if use_systemd:
 | 
						|
            cmd = 'su -s /bin/sh -c "{} {}{}" {}'.format(
 | 
						|
                LOADBRM, dbrmroot,
 | 
						|
                brm_saves_current.decode('utf-8').replace('BRM_saves', ''),
 | 
						|
                USER
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            cmd = '{} {}{}'.format(
 | 
						|
                LOADBRM, dbrmroot,
 | 
						|
                brm_saves_current.decode('utf-8').replace('BRM_saves', '')
 | 
						|
            )
 | 
						|
        try:
 | 
						|
            subprocess.check_call(cmd, shell=True)
 | 
						|
            # systemd services by default works using mysql privileges.
 | 
						|
            # There were cases when shmem segments belongs to root so
 | 
						|
            # explicitly chowns segments.
 | 
						|
            if use_systemd:
 | 
						|
                for shm_file in glob.glob('/dev/shm/@SHMEM_FILE_GLOB@*'):
 | 
						|
                    shutil.chown(shm_file, USER, GROUP)
 | 
						|
        except subprocess.CalledProcessError as exc:
 | 
						|
            logging.error('{} exits with {}.'.format(cmd, exc.returncode))
 | 
						|
            sys.exit(1)
 | 
						|
        except OSError:
 | 
						|
            sys.exit(1)
 | 
						|
    else:
 | 
						|
        if s3_enabled:
 | 
						|
            logging.info(
 | 
						|
                'brm_saves_currenty returned empty string from '
 | 
						|
                'read_from_sm_with_retry'
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            logging.info(
 | 
						|
                'brm_saves_currenty returned empty string from read_from_disk'
 | 
						|
            )
 | 
						|
        sys.exit(1)
 |