You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
390 lines
12 KiB
Python
Executable File
390 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import configparser
|
|
import fcntl
|
|
import json
|
|
import glob
|
|
import logging
|
|
import os
|
|
import socket
|
|
import ssl
|
|
import struct
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import xml.etree.ElementTree as ET
|
|
from urllib.request import Request, urlopen
|
|
from urllib.error import HTTPError, URLError
|
|
|
|
|
|
MCS_SYSCONF_DIR = '@ENGINE_SYSCONFDIR@'
|
|
MCS_ETC_PATH = os.path.join(MCS_SYSCONF_DIR, 'columnstore')
|
|
CMAPI_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'cmapi_server.conf')
|
|
MCS_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'Columnstore.xml')
|
|
SM_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'storagemanager.cnf')
|
|
MCS_BIN_DIR = '@ENGINE_BINDIR@'
|
|
SAVEBRM = os.path.join(MCS_BIN_DIR, 'save_brm')
|
|
EM_FILE_SUFFIX = '_em'
|
|
EM_FILE_SIZE_THRESHOLD = 1000
|
|
HALF_A_MINUTE = 30
|
|
NUMBER_OF_FILES_TO_KEEP = 40
|
|
DEFAULT_EM_LOCAL_PATH_PREFIX = ''
|
|
LOCALHOST = '127.0.0.1'
|
|
# according to https://www.ibm.com/docs/en/storage-sentinel/1.1.2?topic=installation-map-your-local-host-loopback-address
|
|
LOCALHOSTS = (
|
|
'127.0.0.1',
|
|
'localhost', 'localhost.localdomain',
|
|
'localhost4', 'localhost4.localdomain4',
|
|
'::1',
|
|
'localhost6', 'localhost6.localdomain6',
|
|
)
|
|
API_VERSION = '0.4.0'
|
|
API_PORT = '8640'
|
|
BRM_BACKUP_PATH = '/tmp/columnstore_tmp_files/rdwrscratch/'
|
|
BRM_BACKUP_PATH_PART = '{}_BRM_saves'
|
|
|
|
|
|
def get_api_key():
|
|
"""Get API key from cmapi config file.
|
|
|
|
:return: return api key
|
|
:rtype: str
|
|
"""
|
|
cmapi_config = configparser.ConfigParser()
|
|
cmapi_config.read(CMAPI_CONFIG_PATH)
|
|
# dequote?
|
|
return cmapi_config.get('Authentication', 'x-api-key', fallback='')
|
|
|
|
|
|
def get_unverified_context():
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
return ctx
|
|
|
|
|
|
def cmapi_available():
|
|
"""Check if CMAPI server is running.
|
|
|
|
:return: is CMAPI running or not
|
|
:rtype: bool
|
|
"""
|
|
logging.error('Detecting CMAPI is up and running.')
|
|
url = 'https://{}:{}/notfound'.format(LOCALHOST, API_PORT)
|
|
request = Request(method='POST', url=url)
|
|
ctx = get_unverified_context()
|
|
try:
|
|
with urlopen(request, context=ctx, timeout=HALF_A_MINUTE) as req:
|
|
_ = req.read().decode('utf-8')
|
|
except HTTPError as exc:
|
|
if exc.code == 404:
|
|
return True
|
|
except URLError:
|
|
logging.info('CMAPI seems to be unavailable.')
|
|
except Exception:
|
|
logging.error(
|
|
'Undefined error while trying to check CMAPI availabale.',
|
|
exc_info=True
|
|
)
|
|
return False
|
|
|
|
|
|
def get_ip_address_by_nic(ifname):
|
|
"""Get ip address for nic.
|
|
|
|
:param ifname: network intarface name
|
|
:type ifname: str
|
|
:return: ip address
|
|
:rtype: str
|
|
"""
|
|
# doesn't work on windows,
|
|
# OpenBSD and probably doesn't on FreeBSD/pfSense either
|
|
ip_addr = ''
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
ip_addr = socket.inet_ntoa(
|
|
fcntl.ioctl(
|
|
s.fileno(),
|
|
0x8915, # SIOCGIFADDR "socket ioctl get interface address"
|
|
struct.pack('256s', bytes(ifname[:15], 'utf-8'))
|
|
)[20:24]
|
|
)
|
|
except Exception as exc:
|
|
logging.error(
|
|
'Exception while getting IP address of an "{}" interface'.format(
|
|
ifname
|
|
),
|
|
exc_info=True
|
|
)
|
|
finally:
|
|
s.close()
|
|
return ip_addr
|
|
|
|
|
|
def is_primary_fallback(current_hostname):
|
|
"""Detect is node primary or not without cmapi.
|
|
|
|
:param current_hostname: hostname or FQDN
|
|
:type current_hostname: str
|
|
:return: is node primary
|
|
:rtype: bool
|
|
"""
|
|
logging.error(
|
|
'Current DBRM_Controller/IPAddr is {}'.format(current_hostname)
|
|
)
|
|
hostnames = set()
|
|
for _, nic_name in socket.if_nameindex():
|
|
ip_addr = get_ip_address_by_nic(nic_name)
|
|
try:
|
|
hostnames_3tuple = socket.gethostbyaddr(ip_addr)
|
|
hostnames.update([hostnames_3tuple[0], *hostnames_3tuple[1]])
|
|
except:
|
|
pass
|
|
logging.error('Found hostnames {}.'.format(hostnames))
|
|
return current_hostname in LOCALHOSTS or current_hostname in hostnames
|
|
|
|
|
|
def is_node_primary(conf_root):
|
|
"""Detect is current node primary or not.
|
|
|
|
:param conf_root: xml config root element
|
|
:type conf_root: xml.etree.ElementTree.ElementTree
|
|
:return: primary or not
|
|
:rtype: bool
|
|
"""
|
|
if cmapi_available():
|
|
url = 'https://{}:{}/cmapi/{}/node/primary'.format(
|
|
LOCALHOST, API_PORT, API_VERSION
|
|
)
|
|
ctx = get_unverified_context()
|
|
# Actually for this endpoint no need to provide api key cause it's
|
|
# not required
|
|
request = Request(
|
|
method='GET', url=url, headers={'x-api-key': get_api_key()}
|
|
)
|
|
|
|
success = False
|
|
try:
|
|
with urlopen(request, context=ctx, timeout=HALF_A_MINUTE) as req:
|
|
response = req.read()
|
|
success = True
|
|
except HTTPError as exc:
|
|
logging.warning(
|
|
'Something goes wrong while requesting primary status through api. Got response code "{}" with reason "{}".'.format(
|
|
exc.code, exc.reason
|
|
)
|
|
)
|
|
except URLError:
|
|
logging.warning(
|
|
'CMAPI became unavailable while trying to request primary status.'
|
|
)
|
|
except Exception:
|
|
logging.error(
|
|
'Undefined exception while trying to request primary status.',
|
|
exc_info=True
|
|
)
|
|
|
|
if success:
|
|
dict_response = json.loads(response.decode('utf-8'))
|
|
is_primary = dict_response.get('is_primary', False)
|
|
if is_primary and is_primary in ('True', 'true'):
|
|
is_primary = True
|
|
else:
|
|
is_primary = False
|
|
return is_primary
|
|
|
|
logging.info('Trying to detect primary without cmapi running.')
|
|
# text in tag have to be hostname or FQDN
|
|
return is_primary_fallback(conf_root.find('./DBRM_Controller/IPAddr').text)
|
|
|
|
|
|
def get_file_size(file_path):
|
|
""" Returns the size of the file in bytes. """
|
|
try:
|
|
size = os.path.getsize(file_path)
|
|
return size
|
|
except OSError as e:
|
|
logging.error('OSError in get_file_size(): {}.'.format(e))
|
|
return None
|
|
|
|
|
|
def em_is_empty(file_path_prefix):
|
|
"""Returns True if EM file size is less than EM_FILE_SIZE_THRESHOLD
|
|
or its argument is None.
|
|
|
|
:rtype: Bool
|
|
"""
|
|
# Add error message if EM is empty
|
|
if file_path_prefix is None:
|
|
is_em_empty = True
|
|
else:
|
|
filesize = get_file_size(file_path_prefix + EM_FILE_SUFFIX)
|
|
is_em_empty = filesize < EM_FILE_SIZE_THRESHOLD
|
|
if is_em_empty:
|
|
logging.error('EM file is none or its size {} is less than {} bytes.'.format(filesize, EM_FILE_SIZE_THRESHOLD))
|
|
return is_em_empty
|
|
|
|
|
|
def clean_up_backup_brm_files(save_brm_dir_path):
|
|
""" Removes all but 5 last usable sets of BRM files in the specified directory.
|
|
Usable in the context means having non-empty EM.
|
|
"""
|
|
filenames = os.listdir(save_brm_dir_path)
|
|
filenames.sort(reverse=True)
|
|
files_to_remove = filenames[NUMBER_OF_FILES_TO_KEEP:]
|
|
for filename in files_to_remove:
|
|
file_path = os.path.join(save_brm_dir_path, filename)
|
|
logging.error('Clean up {}.'.format(file_path))
|
|
try:
|
|
os.remove(file_path)
|
|
except OSError as e:
|
|
logging.error('OSError exception happens removing {}: {}.'.format(file_path, e))
|
|
|
|
|
|
def remove_files_by_prefix_if_exist(file_path_prefix):
|
|
""" Removes files with the given prefix if they exist. """
|
|
if file_path_prefix is None:
|
|
logging.error(
|
|
'file_path_prefix is None. Cannot remove files.',
|
|
exc_info=True
|
|
)
|
|
return
|
|
try:
|
|
|
|
files_paths = glob.glob(file_path_prefix + '*')
|
|
for file_path in files_paths:
|
|
os.remove(file_path)
|
|
except OSError as e:
|
|
logging.error(
|
|
'Error removing file: {} - {}'.format(file_path, e.strerror),
|
|
exc_info=True
|
|
)
|
|
|
|
|
|
def get_config_root_from_file(file_path):
|
|
"""Returns XML root element from file.
|
|
|
|
:param file_path: xml config path
|
|
:return: XML root element or None
|
|
:rtype: Element or None
|
|
"""
|
|
try:
|
|
cs_config = ET.parse(file_path)
|
|
return cs_config.getroot()
|
|
except (FileNotFoundError, AttributeError, ValueError):
|
|
logging.error(
|
|
'Exception while loading Columnstore.xml. Continue anyway.',
|
|
exc_info=True
|
|
)
|
|
return None
|
|
|
|
def get_epoch_prefix():
|
|
"""Returns a prefix with epoch time
|
|
|
|
:rtype: String
|
|
"""
|
|
epoch_time = int(time.time())
|
|
|
|
return 'backup_{}'.format(epoch_time)
|
|
|
|
|
|
def get_save_brm_dir_path(a_mcs_config_root):
|
|
"""Returns a path that SM treats as local
|
|
|
|
:param file_path: xml config XML root
|
|
:rtype
|
|
"""
|
|
save_brm_dir_path = BRM_BACKUP_PATH
|
|
if a_mcs_config_root is not None:
|
|
try:
|
|
system_temp_file_dir = a_mcs_config_root.find('./SystemConfig/SystemTempFileDir').text
|
|
hdfs_rdwr_scratch = a_mcs_config_root.find('./SystemConfig/hdfsRdwrScratch').text
|
|
# There is a danger to have no '/' in the end of system_temp_file_dir
|
|
# or have two of them there. In both cases save_brm will fail to store
|
|
# files locally.
|
|
save_brm_dir_path = system_temp_file_dir + hdfs_rdwr_scratch
|
|
except AttributeError:
|
|
logging.error('Exception while getting SystemTempFileDir and hdfsRdwrScratch from Columnstore.xml', exc_info=True)
|
|
|
|
return save_brm_dir_path
|
|
|
|
|
|
def get_save_brm_path_prefix(a_mcs_config_root):
|
|
"""Returns a path that SM treats as local
|
|
|
|
:param file_path: xml config XML root
|
|
:rtype: String
|
|
"""
|
|
epoch_prefix = get_epoch_prefix()
|
|
return get_save_brm_dir_path(a_mcs_config_root) + '/' + BRM_BACKUP_PATH_PART.format(epoch_prefix)
|
|
|
|
|
|
def call_save_brm(path):
|
|
"""Calls save_brm first and then tries to call it with local path.
|
|
|
|
:param file_path: xml config XML root
|
|
:rtype: None
|
|
"""
|
|
savebrm_cmd = SAVEBRM + ' ' + path
|
|
try:
|
|
subprocess.check_call(savebrm_cmd, shell=True)
|
|
except subprocess.CalledProcessError as exc:
|
|
logging.error('The call to {} exits with {}.'.format(savebrm_cmd, exc.returncode))
|
|
return None
|
|
except OSError:
|
|
logging.error('Os error while calling savebrm', exc_info=True)
|
|
return None
|
|
return path
|
|
|
|
|
|
def call_save_brm_locally(a_mcs_config_root):
|
|
"""Calls save_brm first and then tries to call it with local path.
|
|
|
|
:param file_path: xml config XML root
|
|
:rtype: None
|
|
"""
|
|
local_path = get_save_brm_path_prefix(a_mcs_config_root)
|
|
return call_save_brm(local_path)
|
|
|
|
|
|
def call_save_brm_with_local_fallback(a_mcs_config_root):
|
|
"""Calls save_brm first and then tries to call it with local path.
|
|
|
|
:param file_path: xml config XML root
|
|
:rtype: None
|
|
"""
|
|
try:
|
|
subprocess.check_call(SAVEBRM, shell=True)
|
|
except subprocess.CalledProcessError as exc:
|
|
logging.error('The primary call to {} exits with {}.'.format(exc.cmd, exc.returncode))
|
|
backup_path = get_save_brm_path_prefix(a_mcs_config_root)
|
|
logging.error('Back up BRM files locally to {}.'.format(backup_path))
|
|
backup_cmd = SAVEBRM + ' ' + backup_path
|
|
try:
|
|
subprocess.check_call(backup_cmd, shell=True)
|
|
except subprocess.CalledProcessError:
|
|
logging.error('The backup call to {} exits with {}.'.format(exc.cmd, exc.returncode))
|
|
except OSError:
|
|
logging.error('Os error while calling savebrm during the backup', exc_info=True)
|
|
|
|
sys.exit(1)
|
|
except OSError:
|
|
logging.error('Os error while calling savebrm', exc_info=True)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
mcs_config_root = get_config_root_from_file(MCS_CONFIG_PATH)
|
|
# config_root can be None
|
|
if is_node_primary(mcs_config_root):
|
|
em_local_path_prefix = call_save_brm_locally(mcs_config_root)
|
|
if not em_local_path_prefix or em_is_empty(em_local_path_prefix):
|
|
# remove_files_by_prefix_if_exist(em_local_path_prefix)
|
|
logging.error('Exiting with error.')
|
|
sys.exit(1)
|
|
|
|
clean_up_backup_brm_files(get_save_brm_dir_path(mcs_config_root))
|
|
|
|
call_save_brm(DEFAULT_EM_LOCAL_PATH_PREFIX)
|
|
|
|
sys.exit(0)
|