You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	fix(shmem,brm,scripts): mcs-savebrm.py wrapper now cleans shmem locks before calling save_brm. mcs-shmem-locks now has --reset-all flag (#3784)
This commit is contained in:
		@@ -11,6 +11,7 @@ import struct
 | 
			
		||||
import subprocess
 | 
			
		||||
import sys
 | 
			
		||||
import time
 | 
			
		||||
from typing import Optional
 | 
			
		||||
import xml.etree.ElementTree as ET
 | 
			
		||||
from urllib.request import Request, urlopen
 | 
			
		||||
from urllib.error import HTTPError, URLError
 | 
			
		||||
@@ -23,9 +24,11 @@ MCS_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'Columnstore.xml')
 | 
			
		||||
SM_CONFIG_PATH = os.path.join(MCS_ETC_PATH, 'storagemanager.cnf')
 | 
			
		||||
MCS_BIN_DIR = '@ENGINE_BINDIR@'
 | 
			
		||||
SAVEBRM = os.path.join(MCS_BIN_DIR, 'save_brm')
 | 
			
		||||
CLEAR_ALL_SHMEM_LOCKS = os.path.join(MCS_BIN_DIR, 'mcs-shmem-locks') 
 | 
			
		||||
CLEAR_ALL_SHMEM_LOCKS_ARGS = '-a'
 | 
			
		||||
EM_FILE_SUFFIX = '_em'
 | 
			
		||||
EM_FILE_SIZE_THRESHOLD = 1000
 | 
			
		||||
HALF_A_MINUTE = 30
 | 
			
		||||
FIVE_SECS = 5
 | 
			
		||||
NUMBER_OF_FILES_TO_KEEP = 40
 | 
			
		||||
DEFAULT_EM_LOCAL_PATH_PREFIX = ''
 | 
			
		||||
LOCALHOST = '127.0.0.1'
 | 
			
		||||
@@ -68,12 +71,12 @@ def cmapi_available():
 | 
			
		||||
    :return: is CMAPI running or not
 | 
			
		||||
    :rtype: bool
 | 
			
		||||
    """
 | 
			
		||||
    logging.debug('Detecting CMAPI is up and running.')
 | 
			
		||||
    logging.info('Detecting CMAPI is up and running.')
 | 
			
		||||
    url = 'https://{}:{}/notfound'.format(LOCALHOST, API_PORT)
 | 
			
		||||
    request = Request(method='POST', url=url)
 | 
			
		||||
    ctx = get_unverified_context()
 | 
			
		||||
    try:
 | 
			
		||||
        with urlopen(request, context=ctx, timeout=HALF_A_MINUTE) as req:
 | 
			
		||||
        with urlopen(request, context=ctx, timeout=FIVE_SECS) as req:
 | 
			
		||||
            _ = req.read().decode('utf-8')
 | 
			
		||||
    except HTTPError as exc:
 | 
			
		||||
        if exc.code == 404:
 | 
			
		||||
@@ -128,7 +131,7 @@ def is_primary_fallback(current_hostname):
 | 
			
		||||
    :return: is node primary
 | 
			
		||||
    :rtype: bool
 | 
			
		||||
    """
 | 
			
		||||
    logging.debug(
 | 
			
		||||
    logging.info(
 | 
			
		||||
        'Current DBRM_Controller/IPAddr is {}'.format(current_hostname)
 | 
			
		||||
    )
 | 
			
		||||
    hostnames = set()
 | 
			
		||||
@@ -139,7 +142,7 @@ def is_primary_fallback(current_hostname):
 | 
			
		||||
            hostnames.update([hostnames_3tuple[0], *hostnames_3tuple[1]])
 | 
			
		||||
        except:
 | 
			
		||||
            pass
 | 
			
		||||
    logging.debug('Found hostnames {}.'.format(hostnames))
 | 
			
		||||
    logging.info('Found hostnames {}.'.format(hostnames))
 | 
			
		||||
    return current_hostname in LOCALHOSTS or current_hostname in hostnames
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -164,7 +167,7 @@ def is_node_primary(conf_root):
 | 
			
		||||
 | 
			
		||||
        success = False
 | 
			
		||||
        try:
 | 
			
		||||
            with urlopen(request, context=ctx, timeout=HALF_A_MINUTE) as req:
 | 
			
		||||
            with urlopen(request, context=ctx, timeout=FIVE_SECS) as req:
 | 
			
		||||
                response = req.read()
 | 
			
		||||
            success = True
 | 
			
		||||
        except HTTPError as exc:
 | 
			
		||||
@@ -214,9 +217,11 @@ def em_is_empty(file_path_prefix):
 | 
			
		||||
    :rtype: Bool
 | 
			
		||||
    """
 | 
			
		||||
    # Add error message if EM is empty
 | 
			
		||||
    is_none = file_path_prefix is None
 | 
			
		||||
    filesize = get_file_size(file_path_prefix + EM_FILE_SUFFIX)
 | 
			
		||||
    is_em_empty = is_none or filesize < EM_FILE_SIZE_THRESHOLD
 | 
			
		||||
    if file_path_prefix is None:
 | 
			
		||||
        is_em_empty = True
 | 
			
		||||
    else:
 | 
			
		||||
        filesize = get_file_size(file_path_prefix + EM_FILE_SUFFIX)
 | 
			
		||||
        is_em_empty = filesize < EM_FILE_SIZE_THRESHOLD
 | 
			
		||||
    if is_em_empty:
 | 
			
		||||
        logging.error('EM file is none or its size {} is less than {} bytes.'.format(filesize, EM_FILE_SIZE_THRESHOLD))
 | 
			
		||||
    return is_em_empty
 | 
			
		||||
@@ -231,7 +236,7 @@ def clean_up_backup_brm_files(save_brm_dir_path):
 | 
			
		||||
    files_to_remove = filenames[NUMBER_OF_FILES_TO_KEEP:]
 | 
			
		||||
    for filename in files_to_remove:
 | 
			
		||||
        file_path = os.path.join(save_brm_dir_path, filename)
 | 
			
		||||
        logging.debug('Clean up {}.'.format(file_path))
 | 
			
		||||
        logging.info('Clean up {}.'.format(file_path))
 | 
			
		||||
        try:
 | 
			
		||||
            os.remove(file_path)
 | 
			
		||||
        except OSError as e:
 | 
			
		||||
@@ -316,72 +321,65 @@ def get_save_brm_path_prefix(a_mcs_config_root):
 | 
			
		||||
    return get_save_brm_dir_path(a_mcs_config_root) + '/' + BRM_BACKUP_PATH_PART.format(epoch_prefix)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def call_save_brm(path):
 | 
			
		||||
    """Calls save_brm first and then tries to call it with local path. 
 | 
			
		||||
def call_executable_with_params(executable: str, args: str) -> bool:
 | 
			
		||||
    """Calls executable and return optional result
 | 
			
		||||
 | 
			
		||||
    :param file_path: xml config XML root
 | 
			
		||||
    :rtype: None
 | 
			
		||||
    :param executable: executable to call
 | 
			
		||||
    :rtype: Optional[str]
 | 
			
		||||
    """
 | 
			
		||||
    savebrm_cmd = SAVEBRM + ' ' + path
 | 
			
		||||
    executable_w_args: str = executable + ' ' + args
 | 
			
		||||
    try:
 | 
			
		||||
        subprocess.check_call(savebrm_cmd, shell=True)
 | 
			
		||||
        subprocess.check_call(executable_w_args, shell=True)
 | 
			
		||||
    except subprocess.CalledProcessError as exc:
 | 
			
		||||
        logging.error('The call to {} exits with {}.'.format(savebrm_cmd, exc.returncode))
 | 
			
		||||
        return None
 | 
			
		||||
        logging.error('The call to {} exits with {}.'.format(executable, exc.returncode))
 | 
			
		||||
        return False
 | 
			
		||||
    except OSError:
 | 
			
		||||
        logging.error('Os error while calling savebrm', exc_info=True)
 | 
			
		||||
        return None
 | 
			
		||||
    return path
 | 
			
		||||
        logging.error('Os error while calling {}.'.format(executable), exc_info=True)
 | 
			
		||||
        return False
 | 
			
		||||
    return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def call_save_brm_locally(a_mcs_config_root):
 | 
			
		||||
    """Calls save_brm first and then tries to call it with local path. 
 | 
			
		||||
def clear_shmem_locks() -> bool:
 | 
			
		||||
    """Clears shmem locks before save_brm call 
 | 
			
		||||
 | 
			
		||||
    :param file_path: xml config XML root
 | 
			
		||||
    :rtype: None
 | 
			
		||||
    :rtype: Optional[str]
 | 
			
		||||
    """
 | 
			
		||||
    local_path = get_save_brm_path_prefix(a_mcs_config_root)
 | 
			
		||||
    return call_save_brm(local_path)
 | 
			
		||||
    logging.info('Clear shmem read locks.')
 | 
			
		||||
    return call_executable_with_params(CLEAR_ALL_SHMEM_LOCKS, CLEAR_ALL_SHMEM_LOCKS_ARGS)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def call_save_brm_with_local_fallback(a_mcs_config_root):
 | 
			
		||||
    """Calls save_brm first and then tries to call it with local path. 
 | 
			
		||||
def call_save_brm(path) -> bool:
 | 
			
		||||
    """Calls save_brm with a path. 
 | 
			
		||||
 | 
			
		||||
    :param file_path: xml config XML root
 | 
			
		||||
    :rtype: None
 | 
			
		||||
    :param path: path to save_brm
 | 
			
		||||
    :rtype: Optional[str]
 | 
			
		||||
    """
 | 
			
		||||
    try:
 | 
			
		||||
        subprocess.check_call(SAVEBRM, shell=True)
 | 
			
		||||
    except subprocess.CalledProcessError as exc:
 | 
			
		||||
        logging.error('The primary call to {} exits with {}.'.format(exc.cmd, exc.returncode))
 | 
			
		||||
        backup_path = get_save_brm_path_prefix(a_mcs_config_root)
 | 
			
		||||
        logging.debug('Back up BRM files locally to {}.'.format(backup_path))
 | 
			
		||||
        backup_cmd = SAVEBRM + ' ' + backup_path
 | 
			
		||||
        try:
 | 
			
		||||
            subprocess.check_call(backup_cmd, shell=True)
 | 
			
		||||
        except subprocess.CalledProcessError:
 | 
			
		||||
            logging.error('The backup call to {} exits with {}.'.format(exc.cmd, exc.returncode))
 | 
			
		||||
        except OSError:
 | 
			
		||||
            logging.error('Os error while calling savebrm during the backup', exc_info=True)
 | 
			
		||||
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
    except OSError:
 | 
			
		||||
        logging.error('Os error while calling savebrm', exc_info=True)
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
    return call_executable_with_params(SAVEBRM, path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    # Configure logging to show INFO level messages
 | 
			
		||||
    logging.basicConfig(level=logging.INFO, format='%(levelname)s:%(name)s:%(message)s')
 | 
			
		||||
    
 | 
			
		||||
    mcs_config_root = get_config_root_from_file(MCS_CONFIG_PATH)
 | 
			
		||||
    if clear_shmem_locks() is None:
 | 
			
		||||
        logging.error('Exiting with error cleaning locks.')
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
    em_local_path_prefix = get_save_brm_path_prefix(mcs_config_root)
 | 
			
		||||
    # config_root can be None
 | 
			
		||||
    if is_node_primary(mcs_config_root):
 | 
			
		||||
        em_local_path_prefix = call_save_brm_locally(mcs_config_root)
 | 
			
		||||
        if not em_local_path_prefix or em_is_empty(em_local_path_prefix):
 | 
			
		||||
            # remove_files_by_prefix_if_exist(em_local_path_prefix)
 | 
			
		||||
            logging.error('Exiting with error.')
 | 
			
		||||
        if not call_save_brm(em_local_path_prefix) or em_is_empty(em_local_path_prefix):
 | 
			
		||||
            logging.error('Exiting with error trying to safe BRM locally on primary node.')
 | 
			
		||||
            sys.exit(1)
 | 
			
		||||
        
 | 
			
		||||
        clean_up_backup_brm_files(get_save_brm_dir_path(mcs_config_root))
 | 
			
		||||
 | 
			
		||||
        call_save_brm(DEFAULT_EM_LOCAL_PATH_PREFIX)
 | 
			
		||||
    else:
 | 
			
		||||
        # Node is not primary. Call save_brm locally to save a copy of BRM localy
 | 
			
		||||
        logging.error('Node is not primary. Call save_brm locally')
 | 
			
		||||
        if not call_save_brm(em_local_path_prefix) or em_is_empty(em_local_path_prefix):
 | 
			
		||||
            logging.error('Exiting with error trying to safe BRM locally on non-primary node.')
 | 
			
		||||
            sys.exit(1)
 | 
			
		||||
 | 
			
		||||
    sys.exit(0)
 | 
			
		||||
 
 | 
			
		||||
@@ -39,6 +39,16 @@ std::string getShmemLocksList()
 | 
			
		||||
  return oss.str();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int resetAllLocks()
 | 
			
		||||
{
 | 
			
		||||
  for (size_t i = 0; i < RWLockNames.size(); ++i)
 | 
			
		||||
  {
 | 
			
		||||
    auto rwlock = RWLock(0x10000 * i);
 | 
			
		||||
    rwlock.reset();
 | 
			
		||||
  }
 | 
			
		||||
  return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int viewLock(uint8_t lockId)
 | 
			
		||||
{
 | 
			
		||||
  size_t minLockId = (lockId > 0) ? lockId : 1;
 | 
			
		||||
@@ -112,6 +122,7 @@ int main(int argc, char** argv)
 | 
			
		||||
  bool write = false;
 | 
			
		||||
  bool lock = false;
 | 
			
		||||
  bool unlock = false;
 | 
			
		||||
  bool resetAll = false;
 | 
			
		||||
 | 
			
		||||
  po::options_description desc(
 | 
			
		||||
      "A tool to operate or view shmem locks. If neither read nor write operation is specified, the tool "
 | 
			
		||||
@@ -122,12 +133,14 @@ int main(int argc, char** argv)
 | 
			
		||||
 | 
			
		||||
  // clang-format off
 | 
			
		||||
  desc.add_options()("help", "produce help message")
 | 
			
		||||
      ("lock-id,i", po::value<int>(&lockId)->required(), lockid_description.c_str())
 | 
			
		||||
      ("lock-id,i", po::value<int>(&lockId)->default_value(RWLockNames.size()), lockid_description.c_str())
 | 
			
		||||
      ("read-lock,r", po::bool_switch(&read)->default_value(false), "Use read lock.")
 | 
			
		||||
      ("write-lock,w", po::bool_switch(&write)->default_value(false), "Use write lock.")
 | 
			
		||||
      ("lock,l", po::bool_switch(&lock)->default_value(false), "Lock the corresponding shmem lock.")
 | 
			
		||||
      ("unlock,u", po::bool_switch(&unlock)->default_value(false), "Unlock the corresponding shmem write lock.")
 | 
			
		||||
      ("debug,d", po::bool_switch(&debug)->default_value(false), "Print extra output.");
 | 
			
		||||
      ("debug,d", po::bool_switch(&debug)->default_value(false), "Print extra output.")
 | 
			
		||||
      ("reset-all,a", po::bool_switch(&resetAll)->default_value(false), "Reset all shmem locks.");
 | 
			
		||||
 | 
			
		||||
  // clang-format on
 | 
			
		||||
 | 
			
		||||
  po::variables_map vm;
 | 
			
		||||
@@ -139,12 +152,29 @@ int main(int argc, char** argv)
 | 
			
		||||
    return 1;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  conflicting_options(vm, "reset-all", "lock-id");
 | 
			
		||||
  conflicting_options(vm, "lock", "unlock");
 | 
			
		||||
  conflicting_options(vm, "read-lock", "write-lock");
 | 
			
		||||
  check_value<int>(vm, "lock-id", 0, RWLockNames.size());
 | 
			
		||||
  
 | 
			
		||||
  // Only require lock-id validation if reset-all is not used
 | 
			
		||||
  if (!resetAll && (vm.count("lock-id") && !vm["lock-id"].defaulted()))
 | 
			
		||||
  {
 | 
			
		||||
    check_value<int>(vm, "lock-id", 0, RWLockNames.size());
 | 
			
		||||
  }
 | 
			
		||||
  
 | 
			
		||||
  // Require lock-id for operations other than reset-all
 | 
			
		||||
  if (!resetAll && !vm.count("lock-id"))
 | 
			
		||||
  {
 | 
			
		||||
    throw std::logic_error("lock-id is required when not using reset-all");
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  po::notify(vm);
 | 
			
		||||
 | 
			
		||||
  if (resetAll)
 | 
			
		||||
  {
 | 
			
		||||
    return resetAllLocks();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (!read && !write)
 | 
			
		||||
  {
 | 
			
		||||
    return viewLock(lockId);
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user