1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-11-02 06:13:16 +03:00

fix(shmem,brm,scripts): mcs-savebrm.py wrapper now cleans shmem locks before calling save_brm. mcs-shmem-locks now has --reset-all flag

This commit is contained in:
drrtuy
2025-09-19 16:38:56 +00:00
committed by Leonid Fedorov
parent b01be6ae2d
commit a8909a27a9
2 changed files with 77 additions and 51 deletions

View File

@@ -11,6 +11,7 @@ import struct
import subprocess import subprocess
import sys import sys
import time import time
from typing import Optional
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError from urllib.error import HTTPError, URLError
@@ -23,6 +24,8 @@ MCS_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'Columnstore.xml')
SM_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'storagemanager.cnf') SM_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'storagemanager.cnf')
MCS_BIN_DIR = '@ENGINE_BINDIR@' MCS_BIN_DIR = '@ENGINE_BINDIR@'
SAVEBRM = os.path.join(MCS_BIN_DIR, 'save_brm') SAVEBRM = os.path.join(MCS_BIN_DIR, 'save_brm')
CLEAR_ALL_SHMEM_LOCKS = os.path.join(MCS_BIN_DIR, 'mcs-shmem-locks')
CLEAR_ALL_SHMEM_LOCKS_ARGS = '-a'
EM_FILE_SUFFIX = '_em' EM_FILE_SUFFIX = '_em'
EM_FILE_SIZE_THRESHOLD = 1000 EM_FILE_SIZE_THRESHOLD = 1000
FIVE_SECS = 5 FIVE_SECS = 5
@@ -68,7 +71,7 @@ def cmapi_available():
:return: is CMAPI running or not :return: is CMAPI running or not
:rtype: bool :rtype: bool
""" """
logging.error('Detecting CMAPI is up and running.') logging.info('Detecting CMAPI is up and running.')
url = 'https://{}:{}/notfound'.format(LOCALHOST, API_PORT) url = 'https://{}:{}/notfound'.format(LOCALHOST, API_PORT)
request = Request(method='POST', url=url) request = Request(method='POST', url=url)
ctx = get_unverified_context() ctx = get_unverified_context()
@@ -109,7 +112,7 @@ def get_ip_address_by_nic(ifname):
)[20:24] )[20:24]
) )
except Exception as exc: except Exception as exc:
logging.error( logging.debug(
'Exception while getting IP address of an "{}" interface'.format( 'Exception while getting IP address of an "{}" interface'.format(
ifname ifname
), ),
@@ -128,7 +131,7 @@ def is_primary_fallback(current_hostname):
:return: is node primary :return: is node primary
:rtype: bool :rtype: bool
""" """
logging.error( logging.info(
'Current DBRM_Controller/IPAddr is {}'.format(current_hostname) 'Current DBRM_Controller/IPAddr is {}'.format(current_hostname)
) )
hostnames = set() hostnames = set()
@@ -139,7 +142,7 @@ def is_primary_fallback(current_hostname):
hostnames.update([hostnames_3tuple[0], *hostnames_3tuple[1]]) hostnames.update([hostnames_3tuple[0], *hostnames_3tuple[1]])
except: except:
pass pass
logging.error('Found hostnames {}.'.format(hostnames)) logging.info('Found hostnames {}.'.format(hostnames))
return current_hostname in LOCALHOSTS or current_hostname in hostnames return current_hostname in LOCALHOSTS or current_hostname in hostnames
@@ -233,7 +236,7 @@ def clean_up_backup_brm_files(save_brm_dir_path):
files_to_remove = filenames[NUMBER_OF_FILES_TO_KEEP:] files_to_remove = filenames[NUMBER_OF_FILES_TO_KEEP:]
for filename in files_to_remove: for filename in files_to_remove:
file_path = os.path.join(save_brm_dir_path, filename) file_path = os.path.join(save_brm_dir_path, filename)
logging.error('Clean up {}.'.format(file_path)) logging.info('Clean up {}.'.format(file_path))
try: try:
os.remove(file_path) os.remove(file_path)
except OSError as e: except OSError as e:
@@ -318,72 +321,65 @@ def get_save_brm_path_prefix(a_mcs_config_root):
return get_save_brm_dir_path(a_mcs_config_root) + '/' + BRM_BACKUP_PATH_PART.format(epoch_prefix) return get_save_brm_dir_path(a_mcs_config_root) + '/' + BRM_BACKUP_PATH_PART.format(epoch_prefix)
def call_save_brm(path): def call_executable_with_params(executable: str, args: str) -> bool:
"""Calls save_brm first and then tries to call it with local path. """Calls executable and return optional result
:param file_path: xml config XML root :param executable: executable to call
:rtype: None :rtype: Optional[str]
""" """
savebrm_cmd = SAVEBRM + ' ' + path executable_w_args: str = executable + ' ' + args
try: try:
subprocess.check_call(savebrm_cmd, shell=True) subprocess.check_call(executable_w_args, shell=True)
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
logging.error('The call to {} exits with {}.'.format(savebrm_cmd, exc.returncode)) logging.error('The call to {} exits with {}.'.format(executable, exc.returncode))
return None return False
except OSError: except OSError:
logging.error('Os error while calling savebrm', exc_info=True) logging.error('Os error while calling {}.'.format(executable), exc_info=True)
return None return False
return path return True
def call_save_brm_locally(a_mcs_config_root): def clear_shmem_locks() -> bool:
"""Calls save_brm first and then tries to call it with local path. """Clears shmem locks before save_brm call
:param file_path: xml config XML root :rtype: Optional[str]
:rtype: None
""" """
local_path = get_save_brm_path_prefix(a_mcs_config_root) logging.info('Clear shmem read locks.')
return call_save_brm(local_path) return call_executable_with_params(CLEAR_ALL_SHMEM_LOCKS, CLEAR_ALL_SHMEM_LOCKS_ARGS)
def call_save_brm_with_local_fallback(a_mcs_config_root): def call_save_brm(path) -> bool:
"""Calls save_brm first and then tries to call it with local path. """Calls save_brm with a path.
:param file_path: xml config XML root :param path: path to save_brm
:rtype: None :rtype: Optional[str]
""" """
try: return call_executable_with_params(SAVEBRM, path)
subprocess.check_call(SAVEBRM, shell=True)
except subprocess.CalledProcessError as exc:
logging.error('The primary call to {} exits with {}.'.format(exc.cmd, exc.returncode))
backup_path = get_save_brm_path_prefix(a_mcs_config_root)
logging.error('Back up BRM files locally to {}.'.format(backup_path))
backup_cmd = SAVEBRM + ' ' + backup_path
try:
subprocess.check_call(backup_cmd, shell=True)
except subprocess.CalledProcessError:
logging.error('The backup call to {} exits with {}.'.format(exc.cmd, exc.returncode))
except OSError:
logging.error('Os error while calling savebrm during the backup', exc_info=True)
sys.exit(1)
except OSError:
logging.error('Os error while calling savebrm', exc_info=True)
sys.exit(1)
if __name__ == '__main__': if __name__ == '__main__':
# Configure logging to show INFO level messages
logging.basicConfig(level=logging.INFO, format='%(levelname)s:%(name)s:%(message)s')
mcs_config_root = get_config_root_from_file(MCS_CONFIG_PATH) mcs_config_root = get_config_root_from_file(MCS_CONFIG_PATH)
if clear_shmem_locks() is None:
logging.error('Exiting with error cleaning locks.')
sys.exit(1)
em_local_path_prefix = get_save_brm_path_prefix(mcs_config_root)
# config_root can be None # config_root can be None
if is_node_primary(mcs_config_root): if is_node_primary(mcs_config_root):
em_local_path_prefix = call_save_brm_locally(mcs_config_root) if not call_save_brm(em_local_path_prefix) or em_is_empty(em_local_path_prefix):
if not em_local_path_prefix or em_is_empty(em_local_path_prefix): logging.error('Exiting with error trying to safe BRM locally on primary node.')
# remove_files_by_prefix_if_exist(em_local_path_prefix)
logging.error('Exiting with error.')
sys.exit(1) sys.exit(1)
clean_up_backup_brm_files(get_save_brm_dir_path(mcs_config_root)) clean_up_backup_brm_files(get_save_brm_dir_path(mcs_config_root))
call_save_brm(DEFAULT_EM_LOCAL_PATH_PREFIX) call_save_brm(DEFAULT_EM_LOCAL_PATH_PREFIX)
else:
# Node is not primary. Call save_brm locally to save a copy of BRM localy
logging.error('Node is not primary. Call save_brm locally')
if not call_save_brm(em_local_path_prefix) or em_is_empty(em_local_path_prefix):
logging.error('Exiting with error trying to safe BRM locally on non-primary node.')
sys.exit(1)
sys.exit(0) sys.exit(0)

View File

@@ -39,6 +39,16 @@ std::string getShmemLocksList()
return oss.str(); return oss.str();
} }
int resetAllLocks()
{
for (size_t i = 0; i < RWLockNames.size(); ++i)
{
auto rwlock = RWLock(0x10000 * i);
rwlock.reset();
}
return 0;
}
int viewLock(uint8_t lockId) int viewLock(uint8_t lockId)
{ {
size_t minLockId = (lockId > 0) ? lockId : 1; size_t minLockId = (lockId > 0) ? lockId : 1;
@@ -112,6 +122,7 @@ int main(int argc, char** argv)
bool write = false; bool write = false;
bool lock = false; bool lock = false;
bool unlock = false; bool unlock = false;
bool resetAll = false;
po::options_description desc( po::options_description desc(
"A tool to operate or view shmem locks. If neither read nor write operation is specified, the tool " "A tool to operate or view shmem locks. If neither read nor write operation is specified, the tool "
@@ -122,12 +133,14 @@ int main(int argc, char** argv)
// clang-format off // clang-format off
desc.add_options()("help", "produce help message") desc.add_options()("help", "produce help message")
("lock-id,i", po::value<int>(&lockId)->required(), lockid_description.c_str()) ("lock-id,i", po::value<int>(&lockId)->default_value(RWLockNames.size()), lockid_description.c_str())
("read-lock,r", po::bool_switch(&read)->default_value(false), "Use read lock.") ("read-lock,r", po::bool_switch(&read)->default_value(false), "Use read lock.")
("write-lock,w", po::bool_switch(&write)->default_value(false), "Use write lock.") ("write-lock,w", po::bool_switch(&write)->default_value(false), "Use write lock.")
("lock,l", po::bool_switch(&lock)->default_value(false), "Lock the corresponding shmem lock.") ("lock,l", po::bool_switch(&lock)->default_value(false), "Lock the corresponding shmem lock.")
("unlock,u", po::bool_switch(&unlock)->default_value(false), "Unlock the corresponding shmem write lock.") ("unlock,u", po::bool_switch(&unlock)->default_value(false), "Unlock the corresponding shmem write lock.")
("debug,d", po::bool_switch(&debug)->default_value(false), "Print extra output."); ("debug,d", po::bool_switch(&debug)->default_value(false), "Print extra output.")
("reset-all,a", po::bool_switch(&resetAll)->default_value(false), "Reset all shmem locks.");
// clang-format on // clang-format on
po::variables_map vm; po::variables_map vm;
@@ -139,12 +152,29 @@ int main(int argc, char** argv)
return 1; return 1;
} }
conflicting_options(vm, "reset-all", "lock-id");
conflicting_options(vm, "lock", "unlock"); conflicting_options(vm, "lock", "unlock");
conflicting_options(vm, "read-lock", "write-lock"); conflicting_options(vm, "read-lock", "write-lock");
check_value<int>(vm, "lock-id", 0, RWLockNames.size());
// Only require lock-id validation if reset-all is not used
if (!resetAll && (vm.count("lock-id") && !vm["lock-id"].defaulted()))
{
check_value<int>(vm, "lock-id", 0, RWLockNames.size());
}
// Require lock-id for operations other than reset-all
if (!resetAll && !vm.count("lock-id"))
{
throw std::logic_error("lock-id is required when not using reset-all");
}
po::notify(vm); po::notify(vm);
if (resetAll)
{
return resetAllLocks();
}
if (!read && !write) if (!read && !write)
{ {
return viewLock(lockId); return viewLock(lockId);