You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
feat(save_brm)!: MCOL-5709: protect from S3/NFS IO errors (#3206)
* feat(save_brm)!: protect from S3/NFS IO errors * feat(save_brm)!: future refactoring * cleanup * feat(save_brm)!: forgotten template * feat(save-brm,ci)!: python3 package for rocky8 --------- Co-authored-by: Roman Nozdrin <roman.nozdrin@mariadb.com>
This commit is contained in:
@ -40,7 +40,7 @@ local clang_update_alternatives = 'update-alternatives --install /usr/bin/clang
|
||||
local rpm_build_deps = 'install -y lz4 systemd-devel git make libaio-devel openssl-devel boost-devel bison ' +
|
||||
'snappy-devel flex libcurl-devel libxml2-devel ncurses-devel automake libtool ' +
|
||||
'policycoreutils-devel rpm-build lsof iproute pam-devel perl-DBI cracklib-devel ' +
|
||||
'expect createrepo ';
|
||||
'expect createrepo python3 ';
|
||||
|
||||
local centos7_build_deps = 'yum install -y epel-release centos-release-scl ' +
|
||||
'&& yum install -y pcre2-devel devtoolset-' + gcc_version + ' devtoolset-' + gcc_version + '-gcc cmake3 lz4-devel ' +
|
||||
|
@ -89,6 +89,7 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-storagemanager.service.in" "${CM
|
||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-stop-controllernode.sh.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-stop-controllernode.sh" @ONLY)
|
||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-loadbrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-loadbrm.py" @ONLY)
|
||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py" @ONLY)
|
||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcssavebrm.py" @ONLY)
|
||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog.in" "${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog" @ONLY)
|
||||
|
||||
install(PROGRAMS columnstore-post-install
|
||||
@ -124,3 +125,6 @@ install(FILES mariadb-columnstore.service
|
||||
DESTINATION ${ENGINE_SUPPORTDIR} COMPONENT columnstore-engine)
|
||||
|
||||
install(FILES module DESTINATION ${ENGINE_DATADIR}/local COMPONENT columnstore-engine)
|
||||
|
||||
find_package (Python3 COMPONENTS Interpreter REQUIRED)
|
||||
add_test(NAME PythonUnitTests COMMAND ${Python3_EXECUTABLE} -m unittest test_mcs-savebrm.py)
|
@ -309,9 +309,9 @@ if __name__ == '__main__':
|
||||
if s3_enabled:
|
||||
# start SM using systemd
|
||||
if use_systemd:
|
||||
cmd = 'systemctl start mcs-storagemanager'
|
||||
CMD = 'systemctl start mcs-storagemanager'
|
||||
try:
|
||||
subprocess.check_call(cmd, shell=True)
|
||||
subprocess.check_call(CMD, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
logging.error(
|
||||
'Failed to start storagemanager. '
|
||||
@ -393,8 +393,7 @@ if __name__ == '__main__':
|
||||
shutil.chown(current_name, USER, GROUP)
|
||||
else:
|
||||
logging.info(
|
||||
'Cmapi is not running on primary node. '
|
||||
'Skip loading metafiles.'
|
||||
'Cmapi is not running on primary node. Skip loading metafiles.'
|
||||
)
|
||||
except Exception as exc:
|
||||
logging.error(
|
||||
|
@ -2,6 +2,7 @@
|
||||
import configparser
|
||||
import fcntl
|
||||
import json
|
||||
import glob
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
@ -9,6 +10,7 @@ import ssl
|
||||
import struct
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import xml.etree.ElementTree as ET
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import HTTPError, URLError
|
||||
@ -21,7 +23,11 @@ MCS_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'Columnstore.xml')
|
||||
SM_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'storagemanager.cnf')
|
||||
MCS_BIN_DIR = '@ENGINE_BINDIR@'
|
||||
SAVEBRM = os.path.join(MCS_BIN_DIR, 'save_brm')
|
||||
EM_FILE_SUFFIX = '_em'
|
||||
EM_FILE_SIZE_THRESHOLD = 1000
|
||||
HALF_A_MINUTE = 30
|
||||
NUMBER_OF_FILES_TO_KEEP = 40
|
||||
DEFAULT_EM_LOCAL_PATH_PREFIX = ''
|
||||
LOCALHOST = '127.0.0.1'
|
||||
# according to https://www.ibm.com/docs/en/storage-sentinel/1.1.2?topic=installation-map-your-local-host-loopback-address
|
||||
LOCALHOSTS = (
|
||||
@ -33,6 +39,8 @@ LOCALHOSTS = (
|
||||
)
|
||||
API_VERSION = '0.4.0'
|
||||
API_PORT = '8640'
|
||||
BRM_BACKUP_PATH = '/tmp/columnstore_tmp_files/rdwrscratch/'
|
||||
BRM_BACKUP_PATH_PART = '{}_BRM_saves'
|
||||
|
||||
|
||||
def get_api_key():
|
||||
@ -161,16 +169,13 @@ def is_node_primary(conf_root):
|
||||
success = True
|
||||
except HTTPError as exc:
|
||||
logging.warning(
|
||||
'Something goes wrong while requesting primary status ',
|
||||
'through api.',
|
||||
'Got response code "{}" with reason "{}".'.format(
|
||||
'Something goes wrong while requesting primary status through api. Got response code "{}" with reason "{}".'.format(
|
||||
exc.code, exc.reason
|
||||
)
|
||||
)
|
||||
except URLError:
|
||||
logging.warning(
|
||||
'CMAPI became unavailable while trying',
|
||||
'to request primary status.'
|
||||
'CMAPI became unavailable while trying to request primary status.'
|
||||
)
|
||||
except Exception:
|
||||
logging.error(
|
||||
@ -192,42 +197,191 @@ def is_node_primary(conf_root):
|
||||
return is_primary_fallback(conf_root.find('./DBRM_Controller/IPAddr').text)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
master_addr = ''
|
||||
pm_count = 0
|
||||
logging.basicConfig(
|
||||
format='%(levelname)s: %(message)s', level=logging.DEBUG
|
||||
)
|
||||
logging.debug('Loading Columnstore.xml.')
|
||||
def get_file_size(file_path):
|
||||
""" Returns the size of the file in bytes. """
|
||||
try:
|
||||
cs_config = ET.parse(MCS_CONFIG_PATH)
|
||||
config_root = cs_config.getroot()
|
||||
master_addr = config_root.find('./DBRM_Controller/IPAddr').text
|
||||
pm_count = int(
|
||||
config_root.find('./SystemModuleConfig/ModuleCount3').text
|
||||
size = os.path.getsize(file_path)
|
||||
return size
|
||||
except OSError as e:
|
||||
logging.error('OSError in get_file_size(): {}.'.format(e))
|
||||
return None
|
||||
|
||||
|
||||
def em_is_empty(file_path_prefix):
|
||||
"""Returns True if EM file size is less than EM_FILE_SIZE_THRESHOLD
|
||||
or its argument is None.
|
||||
|
||||
:rtype: Bool
|
||||
"""
|
||||
# Add error message if EM is empty
|
||||
is_none = file_path_prefix is None
|
||||
filesize = get_file_size(file_path_prefix + EM_FILE_SUFFIX)
|
||||
is_em_empty = is_none or filesize < EM_FILE_SIZE_THRESHOLD
|
||||
if is_em_empty:
|
||||
logging.error('EM file is none or its size {} is less than {} bytes.'.format(filesize, EM_FILE_SIZE_THRESHOLD))
|
||||
return is_em_empty
|
||||
|
||||
|
||||
def clean_up_backup_brm_files(save_brm_dir_path):
|
||||
""" Removes all but 5 last usable sets of BRM files in the specified directory.
|
||||
Usable in the context means having non-empty EM.
|
||||
"""
|
||||
filenames = os.listdir(save_brm_dir_path)
|
||||
filenames.sort(reverse=True)
|
||||
files_to_remove = filenames[NUMBER_OF_FILES_TO_KEEP:]
|
||||
for filename in files_to_remove:
|
||||
file_path = os.path.join(save_brm_dir_path, filename)
|
||||
logging.debug('Clean up {}.'.format(file_path))
|
||||
try:
|
||||
os.remove(file_path)
|
||||
except OSError as e:
|
||||
logging.error('OSError exception happens removing {}: {}.'.format(file_path, e))
|
||||
|
||||
|
||||
def remove_files_by_prefix_if_exist(file_path_prefix):
|
||||
""" Removes files with the given prefix if they exist. """
|
||||
if file_path_prefix is None:
|
||||
logging.error(
|
||||
'file_path_prefix is None. Cannot remove files.',
|
||||
exc_info=True
|
||||
)
|
||||
logging.debug('Succesfully loaded Columnstore.xml.')
|
||||
except (FileNotFoundError, AttributeError, ValueError) as e:
|
||||
# is it correct case?
|
||||
return
|
||||
try:
|
||||
|
||||
files_paths = glob.glob(file_path_prefix + '*')
|
||||
for file_path in files_paths:
|
||||
os.remove(file_path)
|
||||
except OSError as e:
|
||||
logging.error(
|
||||
'Error removing file: {} - {}'.format(file_path, e.strerror),
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
|
||||
def get_config_root_from_file(file_path):
|
||||
"""Returns XML root element from file.
|
||||
|
||||
:param file_path: xml config path
|
||||
:return: XML root element or None
|
||||
:rtype: Element or None
|
||||
"""
|
||||
try:
|
||||
cs_config = ET.parse(file_path)
|
||||
return cs_config.getroot()
|
||||
except (FileNotFoundError, AttributeError, ValueError):
|
||||
logging.error(
|
||||
'Exception while loading Columnstore.xml. Continue anyway.',
|
||||
exc_info=True
|
||||
)
|
||||
return None
|
||||
|
||||
logging.debug('Reading SM config.')
|
||||
sm_config = configparser.ConfigParser()
|
||||
files_read = len(sm_config.read(SM_CONFIG_PATH))
|
||||
storage = sm_config.get(
|
||||
'ObjectStorage', 'service', fallback='LocalStorage'
|
||||
)
|
||||
def get_epoch_prefix():
|
||||
"""Returns a prefix with epoch time
|
||||
|
||||
if is_node_primary(config_root):
|
||||
:rtype: String
|
||||
"""
|
||||
epoch_time = int(time.time())
|
||||
|
||||
return 'backup_{}'.format(epoch_time)
|
||||
|
||||
|
||||
def get_save_brm_dir_path(a_mcs_config_root):
|
||||
"""Returns a path that SM treats as local
|
||||
|
||||
:param file_path: xml config XML root
|
||||
:rtype
|
||||
"""
|
||||
save_brm_dir_path = BRM_BACKUP_PATH
|
||||
if a_mcs_config_root is not None:
|
||||
try:
|
||||
retcode = subprocess.check_call(SAVEBRM, shell=True)
|
||||
system_temp_file_dir = a_mcs_config_root.find('./SystemConfig/SystemTempFileDir').text
|
||||
hdfs_rdwr_scratch = a_mcs_config_root.find('./SystemConfig/hdfsRdwrScratch').text
|
||||
# There is a danger to have no '/' in the end of system_temp_file_dir
|
||||
# or have two of them there. In both cases save_brm will fail to store
|
||||
# files locally.
|
||||
save_brm_dir_path = system_temp_file_dir + hdfs_rdwr_scratch
|
||||
except AttributeError:
|
||||
logging.error('Exception while getting SystemTempFileDir and hdfsRdwrScratch from Columnstore.xml', exc_info=True)
|
||||
|
||||
return save_brm_dir_path
|
||||
|
||||
|
||||
def get_save_brm_path_prefix(a_mcs_config_root):
|
||||
"""Returns a path that SM treats as local
|
||||
|
||||
:param file_path: xml config XML root
|
||||
:rtype: String
|
||||
"""
|
||||
epoch_prefix = get_epoch_prefix()
|
||||
return get_save_brm_dir_path(a_mcs_config_root) + '/' + BRM_BACKUP_PATH_PART.format(epoch_prefix)
|
||||
|
||||
|
||||
def call_save_brm(path):
|
||||
"""Calls save_brm first and then tries to call it with local path.
|
||||
|
||||
:param file_path: xml config XML root
|
||||
:rtype: None
|
||||
"""
|
||||
savebrm_cmd = SAVEBRM + ' ' + path
|
||||
try:
|
||||
subprocess.check_call(savebrm_cmd, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
logging.error('{} exits with {}.'.format(exc.cmd, exc.returncode))
|
||||
logging.error('The call to {} exits with {}.'.format(savebrm_cmd, exc.returncode))
|
||||
return None
|
||||
except OSError:
|
||||
logging.error('Os error while calling savebrm', exc_info=True)
|
||||
return None
|
||||
return path
|
||||
|
||||
|
||||
def call_save_brm_locally(a_mcs_config_root):
|
||||
"""Calls save_brm first and then tries to call it with local path.
|
||||
|
||||
:param file_path: xml config XML root
|
||||
:rtype: None
|
||||
"""
|
||||
local_path = get_save_brm_path_prefix(a_mcs_config_root)
|
||||
return call_save_brm(local_path)
|
||||
|
||||
|
||||
def call_save_brm_with_local_fallback(a_mcs_config_root):
|
||||
"""Calls save_brm first and then tries to call it with local path.
|
||||
|
||||
:param file_path: xml config XML root
|
||||
:rtype: None
|
||||
"""
|
||||
try:
|
||||
subprocess.check_call(SAVEBRM, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
logging.error('The primary call to {} exits with {}.'.format(exc.cmd, exc.returncode))
|
||||
backup_path = get_save_brm_path_prefix(a_mcs_config_root)
|
||||
logging.debug('Back up BRM files locally to {}.'.format(backup_path))
|
||||
backup_cmd = SAVEBRM + ' ' + backup_path
|
||||
try:
|
||||
subprocess.check_call(backup_cmd, shell=True)
|
||||
except subprocess.CalledProcessError:
|
||||
logging.error('The backup call to {} exits with {}.'.format(exc.cmd, exc.returncode))
|
||||
except OSError:
|
||||
logging.error('Os error while calling savebrm during the backup', exc_info=True)
|
||||
|
||||
sys.exit(1)
|
||||
except OSError:
|
||||
logging.error('Os error while calling savebrm', exc_info=True)
|
||||
sys.exit(0)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
mcs_config_root = get_config_root_from_file(MCS_CONFIG_PATH)
|
||||
# config_root can be None
|
||||
if is_node_primary(mcs_config_root):
|
||||
em_local_path_prefix = call_save_brm_locally(mcs_config_root)
|
||||
if not em_local_path_prefix or em_is_empty(em_local_path_prefix):
|
||||
# remove_files_by_prefix_if_exist(em_local_path_prefix)
|
||||
logging.error('Exiting with error.')
|
||||
sys.exit(1)
|
||||
|
||||
clean_up_backup_brm_files(get_save_brm_dir_path(mcs_config_root))
|
||||
|
||||
call_save_brm(DEFAULT_EM_LOCAL_PATH_PREFIX)
|
||||
|
||||
sys.exit(0)
|
||||
|
117
oam/install_scripts/test_mcs-savebrm.py
Normal file
117
oam/install_scripts/test_mcs-savebrm.py
Normal file
@ -0,0 +1,117 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, mock_open, MagicMock
|
||||
import socket
|
||||
import ssl
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request
|
||||
from xml.etree.ElementTree import Element
|
||||
import importlib.util
|
||||
|
||||
import mcssavebrm
|
||||
|
||||
class TestMcsSavebrmFunctions(unittest.TestCase):
|
||||
@patch('mcssavebrm.configparser.ConfigParser.get', return_value='test_api_key')
|
||||
@patch('mcssavebrm.configparser.ConfigParser.read')
|
||||
def test_get_api_key(self, mock_read, mock_get):
|
||||
self.assertEqual(mcssavebrm.get_api_key(), 'test_api_key')
|
||||
mock_read.assert_called_once_with(mcssavebrm.CMAPI_CONFIG_PATH)
|
||||
|
||||
def test_get_unverified_context(self):
|
||||
ctx = mcssavebrm.get_unverified_context()
|
||||
self.assertFalse(ctx.check_hostname)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
|
||||
@patch('mcssavebrm.urlopen')
|
||||
@patch('mcssavebrm.get_unverified_context')
|
||||
def test_cmapi_available(self, mock_get_unverified_context, mock_urlopen):
|
||||
mock_get_unverified_context.return_value = ssl._create_unverified_context()
|
||||
mock_urlopen.side_effect = HTTPError(None, 404, 'Not Found', None, None)
|
||||
self.assertTrue(mcssavebrm.cmapi_available())
|
||||
|
||||
@patch('mcssavebrm.fcntl.ioctl')
|
||||
@patch('mcssavebrm.socket.socket')
|
||||
def test_get_ip_address_by_nic(self, mock_socket, mock_ioctl):
|
||||
mock_socket_inst = MagicMock()
|
||||
mock_socket.return_value = mock_socket_inst
|
||||
mock_ioctl.return_value = b'\x00' * 20 + b'\x7f\x00\x00\x01'
|
||||
self.assertEqual(mcssavebrm.get_ip_address_by_nic('lo'), '127.0.0.1')
|
||||
|
||||
@patch('mcssavebrm.get_ip_address_by_nic', return_value='127.0.0.1')
|
||||
@patch('mcssavebrm.socket.gethostbyaddr', return_value=('localhost', [], []))
|
||||
@patch('mcssavebrm.socket.if_nameindex', return_value=[(1, 'lo')])
|
||||
def test_is_primary_fallback(self, mock_if_nameindex, mock_gethostbyaddr, mock_get_ip_address_by_nic):
|
||||
self.assertTrue(mcssavebrm.is_primary_fallback('localhost'))
|
||||
|
||||
@patch('mcssavebrm.cmapi_available', return_value=True)
|
||||
@patch('mcssavebrm.urlopen')
|
||||
@patch('mcssavebrm.get_unverified_context')
|
||||
@patch('mcssavebrm.get_api_key', return_value='test_api_key')
|
||||
def test_is_node_primary(self, mock_get_api_key, mock_get_unverified_context, mock_urlopen, mock_cmapi_available):
|
||||
mock_get_unverified_context.return_value = ssl._create_unverified_context()
|
||||
mock_response = MagicMock()
|
||||
mock_response.read.return_value = json.dumps({'is_primary': 'True'}).encode('utf-8')
|
||||
mock_urlopen.return_value.__enter__.return_value = mock_response
|
||||
root = MagicMock()
|
||||
self.assertTrue(mcssavebrm.is_node_primary(root))
|
||||
|
||||
@patch('os.path.getsize', return_value=1024)
|
||||
def test_get_file_size(self, mock_getsize):
|
||||
self.assertEqual(mcssavebrm.get_file_size('test_file'), 1024)
|
||||
|
||||
@patch('mcssavebrm.get_file_size', return_value=500)
|
||||
def test_em_is_empty(self, mock_get_file_size):
|
||||
self.assertTrue(mcssavebrm.em_is_empty('test_prefix'))
|
||||
|
||||
@patch('os.remove')
|
||||
@patch('os.listdir', return_value=[f'test_file{i}' for i in range(50)])
|
||||
def test_clean_up_backup_brm_files(self, mock_listdir, mock_remove):
|
||||
mcssavebrm.clean_up_backup_brm_files('/dummy/path')
|
||||
self.assertEqual(mock_remove.call_count, 10)
|
||||
|
||||
@patch('os.remove')
|
||||
@patch('mcssavebrm.glob.glob', return_value=['test_file1', 'test_file2'])
|
||||
def test_remove_files_by_prefix_if_exist(self, mock_glob, mock_remove):
|
||||
mcssavebrm.remove_files_by_prefix_if_exist('test_prefix')
|
||||
mock_remove.assert_any_call('test_file1')
|
||||
mock_remove.assert_any_call('test_file2')
|
||||
|
||||
@patch('xml.etree.ElementTree.parse')
|
||||
def test_get_config_root_from_file(self, mock_parse):
|
||||
mock_tree = MagicMock()
|
||||
mock_parse.return_value = mock_tree
|
||||
self.assertEqual(mcssavebrm.get_config_root_from_file('test_file'), mock_tree.getroot())
|
||||
|
||||
@patch('time.time', return_value=1624478400)
|
||||
def test_get_epoch_prefix(self, mock_time):
|
||||
self.assertEqual(mcssavebrm.get_epoch_prefix(), 'backup_1624478400')
|
||||
|
||||
@patch('mcssavebrm.get_epoch_prefix', return_value='backup_1624478400')
|
||||
@patch('mcssavebrm.get_save_brm_dir_path', return_value='/tmp/columnstore_tmp_files/rdwrscratch/')
|
||||
def test_get_save_brm_path_prefix(self, mock_get_epoch_prefix, mock_get_save_brm_dir_path):
|
||||
root = MagicMock()
|
||||
self.assertIn('backup_1624478400_BRM_saves', mcssavebrm.get_save_brm_path_prefix(root))
|
||||
|
||||
@patch('subprocess.check_call')
|
||||
def test_call_save_brm(self, mock_check_call):
|
||||
self.assertEqual(mcssavebrm.call_save_brm('test_path'), 'test_path')
|
||||
|
||||
@patch('mcssavebrm.call_save_brm', return_value='test_path')
|
||||
@patch('mcssavebrm.get_save_brm_path_prefix', return_value='test_path')
|
||||
def test_call_save_brm_locally(self, mock_get_save_brm_path_prefix, mock_call_save_brm):
|
||||
root = MagicMock()
|
||||
self.assertEqual(mcssavebrm.call_save_brm_locally(root), 'test_path')
|
||||
|
||||
@patch('subprocess.check_call')
|
||||
@patch('mcssavebrm.get_save_brm_path_prefix', return_value='test_path')
|
||||
def test_call_save_brm_with_local_fallback(self, mock_get_save_brm_path_prefix, mock_check_call):
|
||||
root = MagicMock()
|
||||
mock_check_call.side_effect = [subprocess.CalledProcessError(1, 'test'), None]
|
||||
with self.assertRaises(SystemExit):
|
||||
mcssavebrm.call_save_brm_with_local_fallback(root)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Reference in New Issue
Block a user