mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
* Minor fix in timeout for /notfound "endpoint" call. - [fix] timeout to urlopen /notfound "endpoint" in mcs-savebrm and mcs-loadbrm * chore(CMAPI): reduced a second Time Out asking MDB for a async replication status. The latency must be small b/c MDB Server to be requested is local. --------- Co-authored-by: drrtuy <drrtuy@gmail.com>
456 lines
15 KiB
Python
Executable File
456 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Works with python 3.4 and higher.
|
|
"""
|
|
import configparser
|
|
import fcntl
|
|
import glob
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import socket
|
|
import ssl
|
|
import struct
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
from urllib.request import Request, urlopen
|
|
from urllib.error import HTTPError, URLError
|
|
|
|
|
|
MCS_SYSCONF_DIR = '@ENGINE_SYSCONFDIR@'
|
|
MCS_ETC_PATH = os.path.join(MCS_SYSCONF_DIR, 'columnstore')
|
|
CMAPI_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'cmapi_server.conf')
|
|
MCS_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'Columnstore.xml')
|
|
SM_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'storagemanager.cnf')
|
|
BYPASS_SM_PATH = '/tmp/columnstore_tmp_files/rdwrscratch/BRM_saves'
|
|
COLUMNSTORE_TMP_DIR = '/tmp/columnstore_tmp_files'
|
|
MCS_DATA_PATH = '@ENGINE_DATADIR@'
|
|
MCS_MODULE_FILE_PATH = os.path.join(MCS_DATA_PATH, 'local/module')
|
|
MCS_BIN_DIR = '@ENGINE_BINDIR@'
|
|
SMCAT = os.path.join(MCS_BIN_DIR, 'smcat')
|
|
LOADBRM = os.path.join(MCS_BIN_DIR, 'load_brm')
|
|
S3_DBROOT1_BRM_PATH = 'data1/systemFiles/dbrm/BRM_saves_current'
|
|
USER = '@DEFAULT_USER@'
|
|
GROUP = '@DEFAULT_GROUP@'
|
|
MINUTE = 60
|
|
FIVE_SECS = 5
|
|
UNREASONABLE_DELAY = 600
|
|
LOCALHOST = '127.0.0.1'
|
|
API_VERSION = '0.4.0'
|
|
API_PORT = '8640'
|
|
|
|
|
|
def get_api_key():
|
|
"""Get API key from cmapi config file.
|
|
|
|
:return: return api key
|
|
:rtype: str
|
|
"""
|
|
cmapi_config = configparser.ConfigParser()
|
|
cmapi_config.read(CMAPI_CONFIG_PATH)
|
|
# dequote?
|
|
return cmapi_config.get('Authentication', 'x-api-key', fallback='')
|
|
|
|
|
|
def get_unverified_context():
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
return ctx
|
|
|
|
|
|
def cmapi_available(host=LOCALHOST):
|
|
"""Check if CMAPI server is running.
|
|
|
|
:param host: host address to check
|
|
:type host: str
|
|
:return: is CMAPI running or not
|
|
:rtype: bool
|
|
"""
|
|
logging.debug('Detecting CMAPI is up and running on {}.'.format(host))
|
|
url = 'https://{}:{}/notfound'.format(host, API_PORT)
|
|
request = Request(method='POST', url=url)
|
|
ctx = get_unverified_context()
|
|
try:
|
|
with urlopen(request, context=ctx, timeout=FIVE_SECS) as req:
|
|
_ = req.read().decode('utf-8')
|
|
except HTTPError as exc:
|
|
if exc.code == 404:
|
|
return True
|
|
except URLError:
|
|
logging.info('CMAPI seems to be unavailable.')
|
|
except Exception:
|
|
logging.error(
|
|
'Undefined error while trying to check CMAPI availabale.',
|
|
exc_info=True
|
|
)
|
|
return False
|
|
|
|
|
|
def get_ip_address_by_nic(ifname):
|
|
"""Get ip address for nic.
|
|
|
|
:param ifname: network intarface name
|
|
:type ifname: str
|
|
:return: ip address
|
|
:rtype: str
|
|
"""
|
|
# doesn't work on windows,
|
|
# OpenBSD and probably doesn't on FreeBSD/pfSense either
|
|
ip_addr = ''
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
ip_addr = socket.inet_ntoa(
|
|
fcntl.ioctl(
|
|
s.fileno(),
|
|
0x8915, # SIOCGIFADDR "socket ioctl get interface address"
|
|
struct.pack('256s', bytes(ifname[:15], 'utf-8'))
|
|
)[20:24]
|
|
)
|
|
except Exception:
|
|
logging.debug(
|
|
'Exception while getting IP address of an "{}" interface'.format(
|
|
ifname
|
|
),
|
|
exc_info=True
|
|
)
|
|
finally:
|
|
s.close()
|
|
return ip_addr
|
|
|
|
|
|
def is_primary_fallback(current_hostname):
|
|
"""Detect is node primary or not without cmapi.
|
|
|
|
:param current_hostname: hostname or FQDN
|
|
:type current_hostname: str
|
|
:return: is node primary
|
|
:rtype: bool
|
|
"""
|
|
logging.debug(
|
|
'Current DBRM_Controller/IPAddr is {}'.format(current_hostname)
|
|
)
|
|
hostnames = set()
|
|
for _, nic_name in socket.if_nameindex():
|
|
ip_addr = get_ip_address_by_nic(nic_name)
|
|
if ip_addr:
|
|
hostnames_3tuple = socket.gethostbyaddr(ip_addr)
|
|
hostnames.update([hostnames_3tuple[0], *hostnames_3tuple[1]])
|
|
logging.debug('Found hostnames {}.'.format(hostnames))
|
|
return current_hostname in hostnames
|
|
|
|
|
|
def is_node_primary(conf_root):
|
|
"""Detect is current node primary or not.
|
|
|
|
:param conf_root: xml config root element
|
|
:type conf_root: xml.etree.ElementTree.ElementTree
|
|
:return: primary or not
|
|
:rtype: bool
|
|
"""
|
|
if cmapi_available():
|
|
url = 'https://{}:{}/cmapi/{}/node/primary'.format(
|
|
LOCALHOST, API_PORT, API_VERSION
|
|
)
|
|
ctx = get_unverified_context()
|
|
# Actually for this endpoint no need to provide api key cause it's
|
|
# not required
|
|
request = Request(
|
|
method='GET', url=url, headers={'x-api-key': get_api_key()}
|
|
)
|
|
|
|
success = False
|
|
try:
|
|
with urlopen(request, context=ctx, timeout=FIVE_SECS) as req:
|
|
response = req.read()
|
|
success = True
|
|
except HTTPError as exc:
|
|
logging.warning(
|
|
'Something goes wrong while requesting primary status '
|
|
'through api. '
|
|
f'Got response code "{exc.code}" with reason "{exc.reason}".'
|
|
)
|
|
except URLError:
|
|
logging.warning(
|
|
'CMAPI became unavailable while trying '
|
|
'to request primary status.'
|
|
)
|
|
except Exception:
|
|
logging.error(
|
|
'Undefined exception while trying to request primary status.',
|
|
exc_info=True
|
|
)
|
|
|
|
if success:
|
|
dict_response = json.loads(response.decode('utf-8'))
|
|
is_primary = dict_response.get('is_primary', False)
|
|
if is_primary and is_primary in ('True', 'true'):
|
|
is_primary = True
|
|
else:
|
|
is_primary = False
|
|
return is_primary
|
|
|
|
logging.info('Trying to detect primary without cmapi running.')
|
|
# text in tag have to be hostname or FQDN
|
|
return is_primary_fallback(conf_root.find('./DBRM_Controller/IPAddr').text)
|
|
|
|
|
|
def get_meta(conf_root, meta_type):
|
|
"""Get meta from primary.
|
|
|
|
:param conf_root: xml config root element
|
|
:type conf_root: xml.etree.ElementTree.ElementTree
|
|
:param meta_type: meta name for endpoint call
|
|
:type meta_type: str
|
|
:return: meta data
|
|
:rtype: bytes
|
|
"""
|
|
logging.info('Pulling {} from the primary node.'.format(meta_type))
|
|
primary_address = conf_root.find('./DBRM_Controller/IPAddr').text
|
|
url = 'https://{}:{}/cmapi/{}/node/meta/{}'.format(
|
|
primary_address, API_PORT, API_VERSION, meta_type
|
|
)
|
|
|
|
api_key = get_api_key()
|
|
if len(api_key) == 0:
|
|
logging.error(
|
|
'Failed to find API key in {}.'.format(CMAPI_CONFIG_PATH)
|
|
)
|
|
sys.exit(1)
|
|
headers = {'x-api-key': api_key}
|
|
ctx = get_unverified_context()
|
|
request = Request(method='GET', url=url, headers=headers)
|
|
try:
|
|
with urlopen(request, context=ctx, timeout=MINUTE) as req:
|
|
response = req.read()
|
|
except HTTPError:
|
|
logging.error(
|
|
'Error requesting {} from the primary node.'.format(meta_type)
|
|
)
|
|
raise
|
|
except URLError:
|
|
logging.warning(
|
|
'CMAPI on primary became unavailable while trying '
|
|
'to request {} from it.'.format(meta_type)
|
|
)
|
|
raise
|
|
except Exception:
|
|
logging.error(
|
|
'Undefined exception while trying to request {}.'.format(
|
|
meta_type
|
|
),
|
|
exc_info=True
|
|
)
|
|
raise
|
|
# return content in bytes
|
|
return response
|
|
|
|
def read_from_sm_with_retry(brm_path):
|
|
func_name = 'read_from_sm_with_retry'
|
|
result = ''
|
|
retry_count = 0
|
|
success = False
|
|
args = [SMCAT, brm_path]
|
|
# We need to wait until SM at primary gets an ownership for dbroot1.
|
|
while not success and retry_count < UNREASONABLE_DELAY:
|
|
ret = subprocess.run(args, stdout=subprocess.PIPE)
|
|
if ret.returncode == 0:
|
|
return ret.stdout
|
|
else:
|
|
logging.error(
|
|
'{} got error code {} from smcat, retrying'.format(
|
|
func_name, ret.returncode
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
retry_count += 1
|
|
continue
|
|
if not success:
|
|
logging.error('Can not read {} using {}.'.format(brm_path, SMCAT))
|
|
return result
|
|
|
|
|
|
def read_from_disk(brm_path):
|
|
try:
|
|
return subprocess.check_output(['cat', brm_path])
|
|
except subprocess.CalledProcessError:
|
|
# will happen when brm file does not exist
|
|
logging.error('{} does not exist.'.format(brm_path))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(
|
|
format='%(levelname)s: %(message)s', level=logging.DEBUG
|
|
)
|
|
# To avoid systemd in container environment
|
|
use_systemd = True
|
|
if len(sys.argv) > 1:
|
|
use_systemd = sys.argv[1] != 'no'
|
|
|
|
sm_config = configparser.ConfigParser()
|
|
sm_config.read(SM_CONFIG_PATH)
|
|
storage = sm_config.get(
|
|
'ObjectStorage', 'service', fallback='LocalStorage'
|
|
)
|
|
bucket = sm_config.get('S3', 'bucket', fallback='some_bucket')
|
|
s3_enabled = storage.lower() == 's3' and bucket.lower() != 'some_bucket'
|
|
|
|
cs_config = ET.parse(MCS_CONFIG_PATH)
|
|
config_root = cs_config.getroot()
|
|
dbrmroot = config_root.find('./SystemConfig/DBRMRoot').text
|
|
pmCount = int(config_root.find('./SystemModuleConfig/ModuleCount3').text)
|
|
is_multinode = pmCount > 1
|
|
brm_saves_current = ''
|
|
|
|
if s3_enabled:
|
|
# start SM using systemd
|
|
if use_systemd:
|
|
CMD = 'systemctl start mcs-storagemanager'
|
|
try:
|
|
subprocess.check_call(CMD, shell=True)
|
|
except subprocess.CalledProcessError as exc:
|
|
logging.error(
|
|
'Failed to start storagemanager. '
|
|
f'Command "{cmd}" exits with {exc.returncode}.'
|
|
)
|
|
sys.exit(1)
|
|
time.sleep(1) # allow SM time to init
|
|
|
|
# Adding S3 related configuration into Columnstore.xml
|
|
config_root.find(
|
|
'./Installation/DBRootStorageType'
|
|
).text = 'StorageManager'
|
|
config_root.find('./StorageManager/Enabled').text = 'Y'
|
|
|
|
if config_root.find('./SystemConfig/DataFilePlugin') is None:
|
|
config_root.find('./SystemConfig').append(
|
|
ET.Element('DataFilePlugin')
|
|
)
|
|
|
|
config_root.find('./SystemConfig/DataFilePlugin').text = 'libcloudio.so'
|
|
|
|
temp_mcs_conf_path = '{}.loadbrm'.format(MCS_CONFIG_PATH)
|
|
cs_config.write(temp_mcs_conf_path)
|
|
os.replace(
|
|
'{}.loadbrm'.format(MCS_CONFIG_PATH),
|
|
MCS_CONFIG_PATH
|
|
) # atomic replacement
|
|
|
|
# Single-node on S3
|
|
if s3_enabled and not is_multinode:
|
|
try:
|
|
if use_systemd:
|
|
args = [
|
|
'su', '-s',
|
|
'/bin/sh', '-c',
|
|
'{} {}'.format(SMCAT, S3_DBROOT1_BRM_PATH), USER
|
|
]
|
|
else:
|
|
args = [SMCAT, S3_DBROOT1_BRM_PATH]
|
|
|
|
brm_saves_current = subprocess.check_output(args)
|
|
except subprocess.CalledProcessError as e:
|
|
# will happen when brm file does not exist
|
|
logging.error(
|
|
'{} does not exist.'.format(S3_DBROOT1_BRM_PATH)
|
|
)
|
|
else:
|
|
brm = '{}_current'.format(dbrmroot)
|
|
if is_multinode:
|
|
is_primary = False
|
|
try:
|
|
is_primary = is_node_primary(config_root)
|
|
primary_address = config_root.find(
|
|
'./DBRM_Controller/IPAddr'
|
|
).text
|
|
# Download BRM files from the primary node via CMAPI.
|
|
if not is_primary:
|
|
if cmapi_available(primary_address):
|
|
meta_elems = ['em', 'journal', 'vbbm', 'vss']
|
|
for meta_type in meta_elems:
|
|
meta_data = get_meta(config_root, meta_type)
|
|
# Store BRM files locally to load them up
|
|
dbrmroot = BYPASS_SM_PATH
|
|
if not os.path.exists(dbrmroot):
|
|
os.makedirs(dbrmroot)
|
|
if use_systemd:
|
|
os.system('chown -R {}:{} {}'.format(USER, GROUP, COLUMNSTORE_TMP_DIR))
|
|
shutil.chown(dbrmroot, USER, GROUP)
|
|
|
|
current_name = '{}_{}'.format(dbrmroot, meta_type)
|
|
|
|
logging.info(
|
|
'Saving {} to {}'.format(
|
|
meta_type, current_name
|
|
)
|
|
)
|
|
path = Path(current_name)
|
|
path.write_bytes(meta_data)
|
|
shutil.chown(current_name, USER, GROUP)
|
|
else:
|
|
logging.info(
|
|
'Cmapi is not running on primary node. Skip loading metafiles.'
|
|
)
|
|
except Exception as exc:
|
|
logging.error(
|
|
'Failed to detect primary or load BRM data from'
|
|
'the primary node.',
|
|
exc_info=True
|
|
)
|
|
sys.exit(1)
|
|
# Primary reads BRM files directly from S3 via SM or from disk.
|
|
if is_primary:
|
|
if s3_enabled:
|
|
# S3 path are relative to the current top level dir,
|
|
# e.g. /var/lib/columnstore/data1 becomes /data1
|
|
brm_saves_current = read_from_sm_with_retry(
|
|
S3_DBROOT1_BRM_PATH
|
|
)
|
|
else:
|
|
brm_saves_current = read_from_disk(brm)
|
|
else:
|
|
brm_saves_current = b'BRM_saves\n'
|
|
else:
|
|
brm_saves_current = read_from_disk(brm)
|
|
|
|
if brm_saves_current:
|
|
if use_systemd:
|
|
cmd = 'su -s /bin/sh -c "{} {}{}" {}'.format(
|
|
LOADBRM, dbrmroot,
|
|
brm_saves_current.decode('utf-8').replace('BRM_saves', ''),
|
|
USER
|
|
)
|
|
else:
|
|
cmd = '{} {}{}'.format(
|
|
LOADBRM, dbrmroot,
|
|
brm_saves_current.decode('utf-8').replace('BRM_saves', '')
|
|
)
|
|
try:
|
|
subprocess.check_call(cmd, shell=True)
|
|
# systemd services by default works using mysql privileges.
|
|
# There were cases when shmem segments belongs to root so
|
|
# explicitly chowns segments.
|
|
if use_systemd:
|
|
for shm_file in glob.glob('/dev/shm/@SHMEM_FILE_GLOB@*'):
|
|
shutil.chown(shm_file, USER, GROUP)
|
|
except subprocess.CalledProcessError as exc:
|
|
logging.error('{} exits with {}.'.format(cmd, exc.returncode))
|
|
sys.exit(1)
|
|
except OSError:
|
|
sys.exit(1)
|
|
else:
|
|
if s3_enabled:
|
|
logging.info(
|
|
'brm_saves_currenty returned empty string from '
|
|
'read_from_sm_with_retry'
|
|
)
|
|
else:
|
|
logging.info(
|
|
'brm_saves_currenty returned empty string from read_from_disk'
|
|
)
|
|
sys.exit(1)
|