1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Alan Mologorsky 932546f47c
fix(CMAPI): reduce timeout value for couple CMAPI endpoints (#3153)
* Minor fix in timeout for /notfound "endpoint" call.

- [fix] timeout to urlopen /notfound "endpoint" in mcs-savebrm and mcs-loadbrm

* chore(CMAPI): reduced a second Time Out asking MDB for a async replication status. The latency must be small b/c MDB Server to be requested is local.

---------

Co-authored-by: drrtuy <drrtuy@gmail.com>
2024-11-12 10:56:16 +00:00

456 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Works with python 3.4 and higher.
"""
import configparser
import fcntl
import glob
import json
import logging
import os
import shutil
import socket
import ssl
import struct
import subprocess
import sys
import time
import xml.etree.ElementTree as ET
from pathlib import Path
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
MCS_SYSCONF_DIR = '@ENGINE_SYSCONFDIR@'
MCS_ETC_PATH = os.path.join(MCS_SYSCONF_DIR, 'columnstore')
CMAPI_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'cmapi_server.conf')
MCS_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'Columnstore.xml')
SM_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'storagemanager.cnf')
BYPASS_SM_PATH = '/tmp/columnstore_tmp_files/rdwrscratch/BRM_saves'
COLUMNSTORE_TMP_DIR = '/tmp/columnstore_tmp_files'
MCS_DATA_PATH = '@ENGINE_DATADIR@'
MCS_MODULE_FILE_PATH = os.path.join(MCS_DATA_PATH, 'local/module')
MCS_BIN_DIR = '@ENGINE_BINDIR@'
SMCAT = os.path.join(MCS_BIN_DIR, 'smcat')
LOADBRM = os.path.join(MCS_BIN_DIR, 'load_brm')
S3_DBROOT1_BRM_PATH = 'data1/systemFiles/dbrm/BRM_saves_current'
USER = '@DEFAULT_USER@'
GROUP = '@DEFAULT_GROUP@'
MINUTE = 60
FIVE_SECS = 5
UNREASONABLE_DELAY = 600
LOCALHOST = '127.0.0.1'
API_VERSION = '0.4.0'
API_PORT = '8640'
def get_api_key():
"""Get API key from cmapi config file.
:return: return api key
:rtype: str
"""
cmapi_config = configparser.ConfigParser()
cmapi_config.read(CMAPI_CONFIG_PATH)
# dequote?
return cmapi_config.get('Authentication', 'x-api-key', fallback='')
def get_unverified_context():
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
def cmapi_available(host=LOCALHOST):
"""Check if CMAPI server is running.
:param host: host address to check
:type host: str
:return: is CMAPI running or not
:rtype: bool
"""
logging.debug('Detecting CMAPI is up and running on {}.'.format(host))
url = 'https://{}:{}/notfound'.format(host, API_PORT)
request = Request(method='POST', url=url)
ctx = get_unverified_context()
try:
with urlopen(request, context=ctx, timeout=FIVE_SECS) as req:
_ = req.read().decode('utf-8')
except HTTPError as exc:
if exc.code == 404:
return True
except URLError:
logging.info('CMAPI seems to be unavailable.')
except Exception:
logging.error(
'Undefined error while trying to check CMAPI availabale.',
exc_info=True
)
return False
def get_ip_address_by_nic(ifname):
"""Get ip address for nic.
:param ifname: network intarface name
:type ifname: str
:return: ip address
:rtype: str
"""
# doesn't work on windows,
# OpenBSD and probably doesn't on FreeBSD/pfSense either
ip_addr = ''
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip_addr = socket.inet_ntoa(
fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR "socket ioctl get interface address"
struct.pack('256s', bytes(ifname[:15], 'utf-8'))
)[20:24]
)
except Exception:
logging.debug(
'Exception while getting IP address of an "{}" interface'.format(
ifname
),
exc_info=True
)
finally:
s.close()
return ip_addr
def is_primary_fallback(current_hostname):
"""Detect is node primary or not without cmapi.
:param current_hostname: hostname or FQDN
:type current_hostname: str
:return: is node primary
:rtype: bool
"""
logging.debug(
'Current DBRM_Controller/IPAddr is {}'.format(current_hostname)
)
hostnames = set()
for _, nic_name in socket.if_nameindex():
ip_addr = get_ip_address_by_nic(nic_name)
if ip_addr:
hostnames_3tuple = socket.gethostbyaddr(ip_addr)
hostnames.update([hostnames_3tuple[0], *hostnames_3tuple[1]])
logging.debug('Found hostnames {}.'.format(hostnames))
return current_hostname in hostnames
def is_node_primary(conf_root):
"""Detect is current node primary or not.
:param conf_root: xml config root element
:type conf_root: xml.etree.ElementTree.ElementTree
:return: primary or not
:rtype: bool
"""
if cmapi_available():
url = 'https://{}:{}/cmapi/{}/node/primary'.format(
LOCALHOST, API_PORT, API_VERSION
)
ctx = get_unverified_context()
# Actually for this endpoint no need to provide api key cause it's
# not required
request = Request(
method='GET', url=url, headers={'x-api-key': get_api_key()}
)
success = False
try:
with urlopen(request, context=ctx, timeout=FIVE_SECS) as req:
response = req.read()
success = True
except HTTPError as exc:
logging.warning(
'Something goes wrong while requesting primary status '
'through api. '
f'Got response code "{exc.code}" with reason "{exc.reason}".'
)
except URLError:
logging.warning(
'CMAPI became unavailable while trying '
'to request primary status.'
)
except Exception:
logging.error(
'Undefined exception while trying to request primary status.',
exc_info=True
)
if success:
dict_response = json.loads(response.decode('utf-8'))
is_primary = dict_response.get('is_primary', False)
if is_primary and is_primary in ('True', 'true'):
is_primary = True
else:
is_primary = False
return is_primary
logging.info('Trying to detect primary without cmapi running.')
# text in tag have to be hostname or FQDN
return is_primary_fallback(conf_root.find('./DBRM_Controller/IPAddr').text)
def get_meta(conf_root, meta_type):
"""Get meta from primary.
:param conf_root: xml config root element
:type conf_root: xml.etree.ElementTree.ElementTree
:param meta_type: meta name for endpoint call
:type meta_type: str
:return: meta data
:rtype: bytes
"""
logging.info('Pulling {} from the primary node.'.format(meta_type))
primary_address = conf_root.find('./DBRM_Controller/IPAddr').text
url = 'https://{}:{}/cmapi/{}/node/meta/{}'.format(
primary_address, API_PORT, API_VERSION, meta_type
)
api_key = get_api_key()
if len(api_key) == 0:
logging.error(
'Failed to find API key in {}.'.format(CMAPI_CONFIG_PATH)
)
sys.exit(1)
headers = {'x-api-key': api_key}
ctx = get_unverified_context()
request = Request(method='GET', url=url, headers=headers)
try:
with urlopen(request, context=ctx, timeout=MINUTE) as req:
response = req.read()
except HTTPError:
logging.error(
'Error requesting {} from the primary node.'.format(meta_type)
)
raise
except URLError:
logging.warning(
'CMAPI on primary became unavailable while trying '
'to request {} from it.'.format(meta_type)
)
raise
except Exception:
logging.error(
'Undefined exception while trying to request {}.'.format(
meta_type
),
exc_info=True
)
raise
# return content in bytes
return response
def read_from_sm_with_retry(brm_path):
func_name = 'read_from_sm_with_retry'
result = ''
retry_count = 0
success = False
args = [SMCAT, brm_path]
# We need to wait until SM at primary gets an ownership for dbroot1.
while not success and retry_count < UNREASONABLE_DELAY:
ret = subprocess.run(args, stdout=subprocess.PIPE)
if ret.returncode == 0:
return ret.stdout
else:
logging.error(
'{} got error code {} from smcat, retrying'.format(
func_name, ret.returncode
)
)
time.sleep(1)
retry_count += 1
continue
if not success:
logging.error('Can not read {} using {}.'.format(brm_path, SMCAT))
return result
def read_from_disk(brm_path):
try:
return subprocess.check_output(['cat', brm_path])
except subprocess.CalledProcessError:
# will happen when brm file does not exist
logging.error('{} does not exist.'.format(brm_path))
if __name__ == '__main__':
logging.basicConfig(
format='%(levelname)s: %(message)s', level=logging.DEBUG
)
# To avoid systemd in container environment
use_systemd = True
if len(sys.argv) > 1:
use_systemd = sys.argv[1] != 'no'
sm_config = configparser.ConfigParser()
sm_config.read(SM_CONFIG_PATH)
storage = sm_config.get(
'ObjectStorage', 'service', fallback='LocalStorage'
)
bucket = sm_config.get('S3', 'bucket', fallback='some_bucket')
s3_enabled = storage.lower() == 's3' and bucket.lower() != 'some_bucket'
cs_config = ET.parse(MCS_CONFIG_PATH)
config_root = cs_config.getroot()
dbrmroot = config_root.find('./SystemConfig/DBRMRoot').text
pmCount = int(config_root.find('./SystemModuleConfig/ModuleCount3').text)
is_multinode = pmCount > 1
brm_saves_current = ''
if s3_enabled:
# start SM using systemd
if use_systemd:
CMD = 'systemctl start mcs-storagemanager'
try:
subprocess.check_call(CMD, shell=True)
except subprocess.CalledProcessError as exc:
logging.error(
'Failed to start storagemanager. '
f'Command "{cmd}" exits with {exc.returncode}.'
)
sys.exit(1)
time.sleep(1) # allow SM time to init
# Adding S3 related configuration into Columnstore.xml
config_root.find(
'./Installation/DBRootStorageType'
).text = 'StorageManager'
config_root.find('./StorageManager/Enabled').text = 'Y'
if config_root.find('./SystemConfig/DataFilePlugin') is None:
config_root.find('./SystemConfig').append(
ET.Element('DataFilePlugin')
)
config_root.find('./SystemConfig/DataFilePlugin').text = 'libcloudio.so'
temp_mcs_conf_path = '{}.loadbrm'.format(MCS_CONFIG_PATH)
cs_config.write(temp_mcs_conf_path)
os.replace(
'{}.loadbrm'.format(MCS_CONFIG_PATH),
MCS_CONFIG_PATH
) # atomic replacement
# Single-node on S3
if s3_enabled and not is_multinode:
try:
if use_systemd:
args = [
'su', '-s',
'/bin/sh', '-c',
'{} {}'.format(SMCAT, S3_DBROOT1_BRM_PATH), USER
]
else:
args = [SMCAT, S3_DBROOT1_BRM_PATH]
brm_saves_current = subprocess.check_output(args)
except subprocess.CalledProcessError as e:
# will happen when brm file does not exist
logging.error(
'{} does not exist.'.format(S3_DBROOT1_BRM_PATH)
)
else:
brm = '{}_current'.format(dbrmroot)
if is_multinode:
is_primary = False
try:
is_primary = is_node_primary(config_root)
primary_address = config_root.find(
'./DBRM_Controller/IPAddr'
).text
# Download BRM files from the primary node via CMAPI.
if not is_primary:
if cmapi_available(primary_address):
meta_elems = ['em', 'journal', 'vbbm', 'vss']
for meta_type in meta_elems:
meta_data = get_meta(config_root, meta_type)
# Store BRM files locally to load them up
dbrmroot = BYPASS_SM_PATH
if not os.path.exists(dbrmroot):
os.makedirs(dbrmroot)
if use_systemd:
os.system('chown -R {}:{} {}'.format(USER, GROUP, COLUMNSTORE_TMP_DIR))
shutil.chown(dbrmroot, USER, GROUP)
current_name = '{}_{}'.format(dbrmroot, meta_type)
logging.info(
'Saving {} to {}'.format(
meta_type, current_name
)
)
path = Path(current_name)
path.write_bytes(meta_data)
shutil.chown(current_name, USER, GROUP)
else:
logging.info(
'Cmapi is not running on primary node. Skip loading metafiles.'
)
except Exception as exc:
logging.error(
'Failed to detect primary or load BRM data from'
'the primary node.',
exc_info=True
)
sys.exit(1)
# Primary reads BRM files directly from S3 via SM or from disk.
if is_primary:
if s3_enabled:
# S3 path are relative to the current top level dir,
# e.g. /var/lib/columnstore/data1 becomes /data1
brm_saves_current = read_from_sm_with_retry(
S3_DBROOT1_BRM_PATH
)
else:
brm_saves_current = read_from_disk(brm)
else:
brm_saves_current = b'BRM_saves\n'
else:
brm_saves_current = read_from_disk(brm)
if brm_saves_current:
if use_systemd:
cmd = 'su -s /bin/sh -c "{} {}{}" {}'.format(
LOADBRM, dbrmroot,
brm_saves_current.decode('utf-8').replace('BRM_saves', ''),
USER
)
else:
cmd = '{} {}{}'.format(
LOADBRM, dbrmroot,
brm_saves_current.decode('utf-8').replace('BRM_saves', '')
)
try:
subprocess.check_call(cmd, shell=True)
# systemd services by default works using mysql privileges.
# There were cases when shmem segments belongs to root so
# explicitly chowns segments.
if use_systemd:
for shm_file in glob.glob('/dev/shm/@SHMEM_FILE_GLOB@*'):
shutil.chown(shm_file, USER, GROUP)
except subprocess.CalledProcessError as exc:
logging.error('{} exits with {}.'.format(cmd, exc.returncode))
sys.exit(1)
except OSError:
sys.exit(1)
else:
if s3_enabled:
logging.info(
'brm_saves_currenty returned empty string from '
'read_from_sm_with_retry'
)
else:
logging.info(
'brm_saves_currenty returned empty string from read_from_disk'
)
sys.exit(1)