#!/usr/bin/env python3 """ Works with python 3.4 and higher. """ import configparser import fcntl import glob import json import logging import os import shutil import socket import ssl import struct import subprocess import sys import time import xml.etree.ElementTree as ET from pathlib import Path from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError MCS_SYSCONF_DIR = '@ENGINE_SYSCONFDIR@' MCS_ETC_PATH = os.path.join(MCS_SYSCONF_DIR, 'columnstore') CMAPI_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'cmapi_server.conf') MCS_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'Columnstore.xml') SM_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'storagemanager.cnf') BYPASS_SM_PATH = '/tmp/columnstore_tmp_files/rdwrscratch/BRM_saves' COLUMNSTORE_TMP_DIR = '/tmp/columnstore_tmp_files' MCS_DATA_PATH = '@ENGINE_DATADIR@' MCS_MODULE_FILE_PATH = os.path.join(MCS_DATA_PATH, 'local/module') MCS_BIN_DIR = '@ENGINE_BINDIR@' SMCAT = os.path.join(MCS_BIN_DIR, 'smcat') LOADBRM = os.path.join(MCS_BIN_DIR, 'load_brm') S3_DBROOT1_BRM_PATH = 'data1/systemFiles/dbrm/BRM_saves_current' USER = '@DEFAULT_USER@' GROUP = '@DEFAULT_GROUP@' MINUTE = 60 FIVE_SECS = 5 UNREASONABLE_DELAY = 600 LOCALHOST = '127.0.0.1' API_VERSION = '0.4.0' API_PORT = '8640' def get_api_key(): """Get API key from cmapi config file. :return: return api key :rtype: str """ cmapi_config = configparser.ConfigParser() cmapi_config.read(CMAPI_CONFIG_PATH) # dequote? return cmapi_config.get('Authentication', 'x-api-key', fallback='') def get_unverified_context(): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def cmapi_available(host=LOCALHOST): """Check if CMAPI server is running. :param host: host address to check :type host: str :return: is CMAPI running or not :rtype: bool """ logging.debug('Detecting CMAPI is up and running on {}.'.format(host)) url = 'https://{}:{}/notfound'.format(host, API_PORT) request = Request(method='POST', url=url) ctx = get_unverified_context() try: with urlopen(request, context=ctx, timeout=FIVE_SECS) as req: _ = req.read().decode('utf-8') except HTTPError as exc: if exc.code == 404: return True except URLError: logging.info('CMAPI seems to be unavailable.') except Exception: logging.error( 'Undefined error while trying to check CMAPI availabale.', exc_info=True ) return False def get_ip_address_by_nic(ifname): """Get ip address for nic. :param ifname: network intarface name :type ifname: str :return: ip address :rtype: str """ # doesn't work on windows, # OpenBSD and probably doesn't on FreeBSD/pfSense either ip_addr = '' try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ip_addr = socket.inet_ntoa( fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR "socket ioctl get interface address" struct.pack('256s', bytes(ifname[:15], 'utf-8')) )[20:24] ) except Exception: logging.debug( 'Exception while getting IP address of an "{}" interface'.format( ifname ), exc_info=True ) finally: s.close() return ip_addr def is_primary_fallback(current_hostname): """Detect is node primary or not without cmapi. :param current_hostname: hostname or FQDN :type current_hostname: str :return: is node primary :rtype: bool """ logging.debug( 'Current DBRM_Controller/IPAddr is {}'.format(current_hostname) ) hostnames = set() for _, nic_name in socket.if_nameindex(): ip_addr = get_ip_address_by_nic(nic_name) if ip_addr: hostnames_3tuple = socket.gethostbyaddr(ip_addr) hostnames.update([hostnames_3tuple[0], *hostnames_3tuple[1]]) logging.debug('Found hostnames {}.'.format(hostnames)) return current_hostname in hostnames def is_node_primary(conf_root): """Detect is current node primary or not. :param conf_root: xml config root element :type conf_root: xml.etree.ElementTree.ElementTree :return: primary or not :rtype: bool """ if cmapi_available(): url = 'https://{}:{}/cmapi/{}/node/primary'.format( LOCALHOST, API_PORT, API_VERSION ) ctx = get_unverified_context() # Actually for this endpoint no need to provide api key cause it's # not required request = Request( method='GET', url=url, headers={'x-api-key': get_api_key()} ) success = False try: with urlopen(request, context=ctx, timeout=FIVE_SECS) as req: response = req.read() success = True except HTTPError as exc: logging.warning( 'Something goes wrong while requesting primary status ' 'through api. ' f'Got response code "{exc.code}" with reason "{exc.reason}".' ) except URLError: logging.warning( 'CMAPI became unavailable while trying ' 'to request primary status.' ) except Exception: logging.error( 'Undefined exception while trying to request primary status.', exc_info=True ) if success: dict_response = json.loads(response.decode('utf-8')) is_primary = dict_response.get('is_primary', False) if is_primary and is_primary in ('True', 'true'): is_primary = True else: is_primary = False return is_primary logging.info('Trying to detect primary without cmapi running.') # text in tag have to be hostname or FQDN return is_primary_fallback(conf_root.find('./DBRM_Controller/IPAddr').text) def get_meta(conf_root, meta_type): """Get meta from primary. :param conf_root: xml config root element :type conf_root: xml.etree.ElementTree.ElementTree :param meta_type: meta name for endpoint call :type meta_type: str :return: meta data :rtype: bytes """ logging.info('Pulling {} from the primary node.'.format(meta_type)) primary_address = conf_root.find('./DBRM_Controller/IPAddr').text url = 'https://{}:{}/cmapi/{}/node/meta/{}'.format( primary_address, API_PORT, API_VERSION, meta_type ) api_key = get_api_key() if len(api_key) == 0: logging.error( 'Failed to find API key in {}.'.format(CMAPI_CONFIG_PATH) ) sys.exit(1) headers = {'x-api-key': api_key} ctx = get_unverified_context() request = Request(method='GET', url=url, headers=headers) try: with urlopen(request, context=ctx, timeout=MINUTE) as req: response = req.read() except HTTPError: logging.error( 'Error requesting {} from the primary node.'.format(meta_type) ) raise except URLError: logging.warning( 'CMAPI on primary became unavailable while trying ' 'to request {} from it.'.format(meta_type) ) raise except Exception: logging.error( 'Undefined exception while trying to request {}.'.format( meta_type ), exc_info=True ) raise # return content in bytes return response def read_from_sm_with_retry(brm_path): func_name = 'read_from_sm_with_retry' result = '' retry_count = 0 success = False args = [SMCAT, brm_path] # We need to wait until SM at primary gets an ownership for dbroot1. while not success and retry_count < UNREASONABLE_DELAY: ret = subprocess.run(args, stdout=subprocess.PIPE) if ret.returncode == 0: return ret.stdout else: logging.error( '{} got error code {} from smcat, retrying'.format( func_name, ret.returncode ) ) time.sleep(1) retry_count += 1 continue if not success: logging.error('Can not read {} using {}.'.format(brm_path, SMCAT)) return result def read_from_disk(brm_path): try: return subprocess.check_output(['cat', brm_path]) except subprocess.CalledProcessError: # will happen when brm file does not exist logging.error('{} does not exist.'.format(brm_path)) if __name__ == '__main__': logging.basicConfig( format='%(levelname)s: %(message)s', level=logging.DEBUG ) # To avoid systemd in container environment use_systemd = True if len(sys.argv) > 1: use_systemd = sys.argv[1] != 'no' sm_config = configparser.ConfigParser() sm_config.read(SM_CONFIG_PATH) storage = sm_config.get( 'ObjectStorage', 'service', fallback='LocalStorage' ) bucket = sm_config.get('S3', 'bucket', fallback='some_bucket') s3_enabled = storage.lower() == 's3' and bucket.lower() != 'some_bucket' cs_config = ET.parse(MCS_CONFIG_PATH) config_root = cs_config.getroot() dbrmroot = config_root.find('./SystemConfig/DBRMRoot').text pmCount = int(config_root.find('./SystemModuleConfig/ModuleCount3').text) is_multinode = pmCount > 1 brm_saves_current = '' if s3_enabled: # start SM using systemd if use_systemd: CMD = 'systemctl start mcs-storagemanager' try: subprocess.check_call(CMD, shell=True) except subprocess.CalledProcessError as exc: logging.error( 'Failed to start storagemanager. ' f'Command "{cmd}" exits with {exc.returncode}.' ) sys.exit(1) time.sleep(1) # allow SM time to init # Adding S3 related configuration into Columnstore.xml config_root.find( './Installation/DBRootStorageType' ).text = 'StorageManager' config_root.find('./StorageManager/Enabled').text = 'Y' if config_root.find('./SystemConfig/DataFilePlugin') is None: config_root.find('./SystemConfig').append( ET.Element('DataFilePlugin') ) config_root.find('./SystemConfig/DataFilePlugin').text = 'libcloudio.so' temp_mcs_conf_path = '{}.loadbrm'.format(MCS_CONFIG_PATH) cs_config.write(temp_mcs_conf_path) os.replace( '{}.loadbrm'.format(MCS_CONFIG_PATH), MCS_CONFIG_PATH ) # atomic replacement # Single-node on S3 if s3_enabled and not is_multinode: try: if use_systemd: args = [ 'su', '-s', '/bin/sh', '-c', '{} {}'.format(SMCAT, S3_DBROOT1_BRM_PATH), USER ] else: args = [SMCAT, S3_DBROOT1_BRM_PATH] brm_saves_current = subprocess.check_output(args) except subprocess.CalledProcessError as e: # will happen when brm file does not exist logging.error( '{} does not exist.'.format(S3_DBROOT1_BRM_PATH) ) else: brm = '{}_current'.format(dbrmroot) if is_multinode: is_primary = False try: is_primary = is_node_primary(config_root) primary_address = config_root.find( './DBRM_Controller/IPAddr' ).text # Download BRM files from the primary node via CMAPI. if not is_primary: if cmapi_available(primary_address): meta_elems = ['em', 'journal', 'vbbm', 'vss'] for meta_type in meta_elems: meta_data = get_meta(config_root, meta_type) # Store BRM files locally to load them up dbrmroot = BYPASS_SM_PATH if not os.path.exists(dbrmroot): os.makedirs(dbrmroot) if use_systemd: os.system('chown -R {}:{} {}'.format(USER, GROUP, COLUMNSTORE_TMP_DIR)) shutil.chown(dbrmroot, USER, GROUP) current_name = '{}_{}'.format(dbrmroot, meta_type) logging.info( 'Saving {} to {}'.format( meta_type, current_name ) ) path = Path(current_name) path.write_bytes(meta_data) shutil.chown(current_name, USER, GROUP) else: logging.info( 'Cmapi is not running on primary node. Skip loading metafiles.' ) except Exception as exc: logging.error( 'Failed to detect primary or load BRM data from' 'the primary node.', exc_info=True ) sys.exit(1) # Primary reads BRM files directly from S3 via SM or from disk. if is_primary: if s3_enabled: # S3 path are relative to the current top level dir, # e.g. /var/lib/columnstore/data1 becomes /data1 brm_saves_current = read_from_sm_with_retry( S3_DBROOT1_BRM_PATH ) else: brm_saves_current = read_from_disk(brm) else: brm_saves_current = b'BRM_saves\n' else: brm_saves_current = read_from_disk(brm) if brm_saves_current: if use_systemd: cmd = 'su -s /bin/sh -c "{} {}{}" {}'.format( LOADBRM, dbrmroot, brm_saves_current.decode('utf-8').replace('BRM_saves', ''), USER ) else: cmd = '{} {}{}'.format( LOADBRM, dbrmroot, brm_saves_current.decode('utf-8').replace('BRM_saves', '') ) try: subprocess.check_call(cmd, shell=True) # systemd services by default works using mysql privileges. # There were cases when shmem segments belongs to root so # explicitly chowns segments. if use_systemd: for shm_file in glob.glob('/dev/shm/@SHMEM_FILE_GLOB@*'): shutil.chown(shm_file, USER, GROUP) except subprocess.CalledProcessError as exc: logging.error('{} exits with {}.'.format(cmd, exc.returncode)) sys.exit(1) except OSError: sys.exit(1) else: if s3_enabled: logging.info( 'brm_saves_currenty returned empty string from ' 'read_from_sm_with_retry' ) else: logging.info( 'brm_saves_currenty returned empty string from read_from_disk' ) sys.exit(1)