diff --git a/oam/install_scripts/mcs-loadbrm.py.in b/oam/install_scripts/mcs-loadbrm.py.in index 9ea06da58..7047d097f 100755 --- a/oam/install_scripts/mcs-loadbrm.py.in +++ b/oam/install_scripts/mcs-loadbrm.py.in @@ -13,6 +13,10 @@ API_CONFIG_PATH = '/etc/columnstore/cmapi_server.conf' BYPASS_SM_PATH = '/tmp/columnstore_tmp_files/rdwrscratch/BRM_saves' USER = '@DEFAULT_USER@' GROUP = '@DEFAULT_GROUP@' +MINUTE = 60 +HALF_A_MINUTE = 30 +UNREASONABLE_DELAY = 600 +SMCAT = '@ENGINE_BINDIR@/smcat' def get_key(): @@ -31,6 +35,36 @@ def get_port(): return '8640' +def read_from_sm_with_retry(brm_path): + func_name = "read_from_sm_with_retry" + result = "" + retry_count = 0 + success = False + args = [SMCAT, brm_path] + # We need to wait until SM at primary gets an ownership for dbroot1. + while not success and retry_count < UNREASONABLE_DELAY: + ret = subprocess.run(args, stdout=subprocess.PIPE) + if ret.returncode == 0: + return ret.stdout + else: + print("{} got error code {} from smcat, \ +retrying".format(func_name, ret.returncode), file=sys.stderr) + time.sleep(1) + retry_count += 1 + continue + if not success: + print('Can not read {} using {}.'.format(brm_path, SMCAT), file=sys.stderr) + return result + + +def read_from_disk(brm_path): + try: + return subprocess.check_output(['cat', brm_path]) + except subprocess.CalledProcessError as e: + # will happen when brm file does not exist + print('{} does not exist.'.format(brm_path), file=sys.stderr) + + if __name__ == '__main__': # To avoid systemd in container environment use_systemd = True @@ -49,13 +83,17 @@ if __name__ == '__main__': if bucket is None: bucket = 'some_bucket' + s3_enabled = storage.lower() == 's3' and not bucket.lower() == 'some_bucket' + dbrmroot = config_root.find('./SystemConfig/DBRMRoot').text pmCount = int(config_root.find('./SystemModuleConfig/ModuleCount3').text) + is_multinode = pmCount > 1 loadbrm = '@ENGINE_BINDIR@/load_brm' + s3_dbroot1_brm_path = 'data1/systemFiles/dbrm/BRM_saves_current' brm_saves_current = '' - if storage.lower() == 's3' and not bucket.lower() == 'some_bucket': + if s3_enabled: # start SM using systemd if use_systemd is True: cmd = 'systemctl start mcs-storagemanager' @@ -66,7 +104,7 @@ if __name__ == '__main__': sys.exit(1) time.sleep(1) # allow SM time to init - brm = 'data1/systemFiles/dbrm/BRM_saves_current' + # Adding S3 related configuration into Columnstore.xml config_root.find('./Installation/DBRootStorageType').text = "StorageManager" config_root.find('./StorageManager/Enabled').text = "Y" @@ -80,21 +118,21 @@ if __name__ == '__main__': '/etc/columnstore/Columnstore.xml') # atomic replacement # Single-node on S3 - if storage.lower() == 's3' and not bucket.lower() == 'some_bucket' and pmCount == 1: + if s3_enabled and not is_multinode: try: if use_systemd: - args = ['su', '-s', '/bin/sh', '-c', 'smcat {}'.format(brm), USER] + args = ['su', '-s', '/bin/sh', '-c', '{} {}'.format(SMCAT, s3_dbroot1_brm_path), USER] else: - args = ['smcat', brm] + args = [smcat, s3_dbroot1_brm_path] brm_saves_current = subprocess.check_output(args) except subprocess.CalledProcessError as e: # will happen when brm file does not exist - print('{} does not exist.'.format(brm), file=sys.stderr) + print('{} does not exist.'.format(s3_dbroot1_brm_path), file=sys.stderr) else: brm = '{}_current'.format(dbrmroot) - # Multi-node - if pmCount > 1: + if is_multinode: + is_primary = False try: import requests requests.packages.urllib3.disable_warnings() @@ -112,44 +150,65 @@ file=sys.stderr) headers = {'x-api-key': api_key} api_version = get_version() api_port = get_port() - elems = ['em', 'journal', 'vbbm', 'vss'] - for e in elems: - print("Pulling {} from the primary node.".format(e)) - url = "https://{}:{}/cmapi/{}/node/meta/{}".format(primary_address, \ -api_port, api_version, e) - r = requests.get(url, verify=False, headers=headers, timeout=30) - if (r.status_code != 200): - raise RuntimeError("Error requesting {} from the primary \ -node.".format(e)) - # Store BRM files locally to load them up - dbrmroot = BYPASS_SM_PATH + # Check using CMAPI if this node is primary. + is_primary = False + url = "https://127.0.0.1:{}/cmapi/{}/node/primary".format(api_port, \ +api_version) + r = requests.get(url, verify=False, headers=headers, timeout=HALF_A_MINUTE) + if r.status_code == 200: + is_primary = r.json().get('is_primary', False) + if is_primary and (is_primary == 'True' or is_primary == 'true'): + is_primary = True + else: + is_primary = False + else: + raise RuntimeError("Error requesting primary status from the local node.") - if not os.path.exists(dbrmroot): - os.makedirs(dbrmroot) - if use_systemd: - shutil.chown(dbrmroot, USER, GROUP) + # Download BRM files from the primary node via CMAPI. + if not is_primary: + elems = ['em', 'journal', 'vbbm', 'vss'] + for e in elems: + print("Pulling {} from the primary node.".format(e)) + url = "https://{}:{}/cmapi/{}/node/meta/{}".format(primary_address, \ + api_port, api_version, e) + r = requests.get(url, verify=False, headers=headers, timeout=MINUTE) + if (r.status_code != 200): + raise RuntimeError("Error requesting {} from the primary \ + node.".format(e)) - current_name = '{}_{}'.format(dbrmroot, e) + # Store BRM files locally to load them up + dbrmroot = BYPASS_SM_PATH + + if not os.path.exists(dbrmroot): + os.makedirs(dbrmroot) + if use_systemd: + shutil.chown(dbrmroot, USER, GROUP) + + current_name = '{}_{}'.format(dbrmroot, e) + + print ("Saving {} to {}".format(e, current_name)) + path = Path(current_name) + path.write_bytes(r.content) + shutil.chown(current_name, USER, GROUP) - print ("Saving {} to {}".format(e, current_name)) - path = Path(current_name) - path.write_bytes(r.content) - shutil.chown(current_name, USER, GROUP) except Exception as e: print(str(e)) - print('Failed to load BRM data from the primary \ + print('Failed to detect primary or load BRM data from the primary \ node {}.'.format(primary_address), file=sys.stderr) sys.exit(1) - - brm_saves_current = b"BRM_saves\n" + # Primary reads BRM files directly from S3 via SM or from disk. + if is_primary: + if s3_enabled: + # S3 path are relative to the current top level dir, + # e.g. /var/lib/columnstore/data1 becomes /data1 + brm_saves_current = read_from_sm_with_retry(s3_dbroot1_brm_path) + else: + brm_saves_current = read_from_disk(brm) + else: + brm_saves_current = b"BRM_saves\n" else: - # load local dbrm - try: - brm_saves_current = subprocess.check_output(['cat', brm]) - except subprocess.CalledProcessError as e: - # will happen when brm file does not exist - print('{} does not exist.'.format(brm), file=sys.stderr) + brm_saves_current = read_from_disk(brm) if brm_saves_current: if use_systemd: @@ -163,8 +222,11 @@ brm_saves_current.decode("utf-8").replace("BRM_saves", "")) if retcode < 0: print('{} exits with {}.'.format(cmd, retcode)) sys.exit(1) + # systemd services by default works using mysql privileges. + # There were cases when shmem segments belongs to root so + # explicitly chowns segments. if use_systemd: - for shm_file in glob.glob('/dev/shm/*'): + for shm_file in glob.glob('/dev/shm/*'): shutil.chown(shm_file, USER, GROUP) except OSError as e: sys.exit(1)