You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
MCOL-4440 Primary on S3 doesn't download BRM files anymore but waits till SM
starts up
This commit is contained in:
@ -13,6 +13,10 @@ API_CONFIG_PATH = '/etc/columnstore/cmapi_server.conf'
|
|||||||
BYPASS_SM_PATH = '/tmp/columnstore_tmp_files/rdwrscratch/BRM_saves'
|
BYPASS_SM_PATH = '/tmp/columnstore_tmp_files/rdwrscratch/BRM_saves'
|
||||||
USER = '@DEFAULT_USER@'
|
USER = '@DEFAULT_USER@'
|
||||||
GROUP = '@DEFAULT_GROUP@'
|
GROUP = '@DEFAULT_GROUP@'
|
||||||
|
MINUTE = 60
|
||||||
|
HALF_A_MINUTE = 30
|
||||||
|
UNREASONABLE_DELAY = 600
|
||||||
|
SMCAT = '@ENGINE_BINDIR@/smcat'
|
||||||
|
|
||||||
|
|
||||||
def get_key():
|
def get_key():
|
||||||
@ -31,6 +35,36 @@ def get_port():
|
|||||||
return '8640'
|
return '8640'
|
||||||
|
|
||||||
|
|
||||||
|
def read_from_sm_with_retry(brm_path):
|
||||||
|
func_name = "read_from_sm_with_retry"
|
||||||
|
result = ""
|
||||||
|
retry_count = 0
|
||||||
|
success = False
|
||||||
|
args = [SMCAT, brm_path]
|
||||||
|
# We need to wait until SM at primary gets an ownership for dbroot1.
|
||||||
|
while not success and retry_count < UNREASONABLE_DELAY:
|
||||||
|
ret = subprocess.run(args, stdout=subprocess.PIPE)
|
||||||
|
if ret.returncode == 0:
|
||||||
|
return ret.stdout
|
||||||
|
else:
|
||||||
|
print("{} got error code {} from smcat, \
|
||||||
|
retrying".format(func_name, ret.returncode), file=sys.stderr)
|
||||||
|
time.sleep(1)
|
||||||
|
retry_count += 1
|
||||||
|
continue
|
||||||
|
if not success:
|
||||||
|
print('Can not read {} using {}.'.format(brm_path, SMCAT), file=sys.stderr)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def read_from_disk(brm_path):
|
||||||
|
try:
|
||||||
|
return subprocess.check_output(['cat', brm_path])
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
# will happen when brm file does not exist
|
||||||
|
print('{} does not exist.'.format(brm_path), file=sys.stderr)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# To avoid systemd in container environment
|
# To avoid systemd in container environment
|
||||||
use_systemd = True
|
use_systemd = True
|
||||||
@ -49,13 +83,17 @@ if __name__ == '__main__':
|
|||||||
if bucket is None:
|
if bucket is None:
|
||||||
bucket = 'some_bucket'
|
bucket = 'some_bucket'
|
||||||
|
|
||||||
|
s3_enabled = storage.lower() == 's3' and not bucket.lower() == 'some_bucket'
|
||||||
|
|
||||||
dbrmroot = config_root.find('./SystemConfig/DBRMRoot').text
|
dbrmroot = config_root.find('./SystemConfig/DBRMRoot').text
|
||||||
pmCount = int(config_root.find('./SystemModuleConfig/ModuleCount3').text)
|
pmCount = int(config_root.find('./SystemModuleConfig/ModuleCount3').text)
|
||||||
|
is_multinode = pmCount > 1
|
||||||
loadbrm = '@ENGINE_BINDIR@/load_brm'
|
loadbrm = '@ENGINE_BINDIR@/load_brm'
|
||||||
|
s3_dbroot1_brm_path = 'data1/systemFiles/dbrm/BRM_saves_current'
|
||||||
|
|
||||||
brm_saves_current = ''
|
brm_saves_current = ''
|
||||||
|
|
||||||
if storage.lower() == 's3' and not bucket.lower() == 'some_bucket':
|
if s3_enabled:
|
||||||
# start SM using systemd
|
# start SM using systemd
|
||||||
if use_systemd is True:
|
if use_systemd is True:
|
||||||
cmd = 'systemctl start mcs-storagemanager'
|
cmd = 'systemctl start mcs-storagemanager'
|
||||||
@ -66,7 +104,7 @@ if __name__ == '__main__':
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
time.sleep(1) # allow SM time to init
|
time.sleep(1) # allow SM time to init
|
||||||
|
|
||||||
brm = 'data1/systemFiles/dbrm/BRM_saves_current'
|
# Adding S3 related configuration into Columnstore.xml
|
||||||
config_root.find('./Installation/DBRootStorageType').text = "StorageManager"
|
config_root.find('./Installation/DBRootStorageType').text = "StorageManager"
|
||||||
config_root.find('./StorageManager/Enabled').text = "Y"
|
config_root.find('./StorageManager/Enabled').text = "Y"
|
||||||
|
|
||||||
@ -80,21 +118,21 @@ if __name__ == '__main__':
|
|||||||
'/etc/columnstore/Columnstore.xml') # atomic replacement
|
'/etc/columnstore/Columnstore.xml') # atomic replacement
|
||||||
|
|
||||||
# Single-node on S3
|
# Single-node on S3
|
||||||
if storage.lower() == 's3' and not bucket.lower() == 'some_bucket' and pmCount == 1:
|
if s3_enabled and not is_multinode:
|
||||||
try:
|
try:
|
||||||
if use_systemd:
|
if use_systemd:
|
||||||
args = ['su', '-s', '/bin/sh', '-c', 'smcat {}'.format(brm), USER]
|
args = ['su', '-s', '/bin/sh', '-c', '{} {}'.format(SMCAT, s3_dbroot1_brm_path), USER]
|
||||||
else:
|
else:
|
||||||
args = ['smcat', brm]
|
args = [smcat, s3_dbroot1_brm_path]
|
||||||
|
|
||||||
brm_saves_current = subprocess.check_output(args)
|
brm_saves_current = subprocess.check_output(args)
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
# will happen when brm file does not exist
|
# will happen when brm file does not exist
|
||||||
print('{} does not exist.'.format(brm), file=sys.stderr)
|
print('{} does not exist.'.format(s3_dbroot1_brm_path), file=sys.stderr)
|
||||||
else:
|
else:
|
||||||
brm = '{}_current'.format(dbrmroot)
|
brm = '{}_current'.format(dbrmroot)
|
||||||
# Multi-node
|
if is_multinode:
|
||||||
if pmCount > 1:
|
is_primary = False
|
||||||
try:
|
try:
|
||||||
import requests
|
import requests
|
||||||
requests.packages.urllib3.disable_warnings()
|
requests.packages.urllib3.disable_warnings()
|
||||||
@ -112,44 +150,65 @@ file=sys.stderr)
|
|||||||
headers = {'x-api-key': api_key}
|
headers = {'x-api-key': api_key}
|
||||||
api_version = get_version()
|
api_version = get_version()
|
||||||
api_port = get_port()
|
api_port = get_port()
|
||||||
elems = ['em', 'journal', 'vbbm', 'vss']
|
|
||||||
for e in elems:
|
|
||||||
print("Pulling {} from the primary node.".format(e))
|
|
||||||
url = "https://{}:{}/cmapi/{}/node/meta/{}".format(primary_address, \
|
|
||||||
api_port, api_version, e)
|
|
||||||
r = requests.get(url, verify=False, headers=headers, timeout=30)
|
|
||||||
if (r.status_code != 200):
|
|
||||||
raise RuntimeError("Error requesting {} from the primary \
|
|
||||||
node.".format(e))
|
|
||||||
|
|
||||||
# Store BRM files locally to load them up
|
# Check using CMAPI if this node is primary.
|
||||||
dbrmroot = BYPASS_SM_PATH
|
is_primary = False
|
||||||
|
url = "https://127.0.0.1:{}/cmapi/{}/node/primary".format(api_port, \
|
||||||
|
api_version)
|
||||||
|
r = requests.get(url, verify=False, headers=headers, timeout=HALF_A_MINUTE)
|
||||||
|
if r.status_code == 200:
|
||||||
|
is_primary = r.json().get('is_primary', False)
|
||||||
|
if is_primary and (is_primary == 'True' or is_primary == 'true'):
|
||||||
|
is_primary = True
|
||||||
|
else:
|
||||||
|
is_primary = False
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Error requesting primary status from the local node.")
|
||||||
|
|
||||||
if not os.path.exists(dbrmroot):
|
# Download BRM files from the primary node via CMAPI.
|
||||||
os.makedirs(dbrmroot)
|
if not is_primary:
|
||||||
if use_systemd:
|
elems = ['em', 'journal', 'vbbm', 'vss']
|
||||||
shutil.chown(dbrmroot, USER, GROUP)
|
for e in elems:
|
||||||
|
print("Pulling {} from the primary node.".format(e))
|
||||||
|
url = "https://{}:{}/cmapi/{}/node/meta/{}".format(primary_address, \
|
||||||
|
api_port, api_version, e)
|
||||||
|
r = requests.get(url, verify=False, headers=headers, timeout=MINUTE)
|
||||||
|
if (r.status_code != 200):
|
||||||
|
raise RuntimeError("Error requesting {} from the primary \
|
||||||
|
node.".format(e))
|
||||||
|
|
||||||
current_name = '{}_{}'.format(dbrmroot, e)
|
# Store BRM files locally to load them up
|
||||||
|
dbrmroot = BYPASS_SM_PATH
|
||||||
|
|
||||||
|
if not os.path.exists(dbrmroot):
|
||||||
|
os.makedirs(dbrmroot)
|
||||||
|
if use_systemd:
|
||||||
|
shutil.chown(dbrmroot, USER, GROUP)
|
||||||
|
|
||||||
|
current_name = '{}_{}'.format(dbrmroot, e)
|
||||||
|
|
||||||
|
print ("Saving {} to {}".format(e, current_name))
|
||||||
|
path = Path(current_name)
|
||||||
|
path.write_bytes(r.content)
|
||||||
|
shutil.chown(current_name, USER, GROUP)
|
||||||
|
|
||||||
print ("Saving {} to {}".format(e, current_name))
|
|
||||||
path = Path(current_name)
|
|
||||||
path.write_bytes(r.content)
|
|
||||||
shutil.chown(current_name, USER, GROUP)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(str(e))
|
print(str(e))
|
||||||
print('Failed to load BRM data from the primary \
|
print('Failed to detect primary or load BRM data from the primary \
|
||||||
node {}.'.format(primary_address), file=sys.stderr)
|
node {}.'.format(primary_address), file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
# Primary reads BRM files directly from S3 via SM or from disk.
|
||||||
brm_saves_current = b"BRM_saves\n"
|
if is_primary:
|
||||||
|
if s3_enabled:
|
||||||
|
# S3 path are relative to the current top level dir,
|
||||||
|
# e.g. /var/lib/columnstore/data1 becomes /data1
|
||||||
|
brm_saves_current = read_from_sm_with_retry(s3_dbroot1_brm_path)
|
||||||
|
else:
|
||||||
|
brm_saves_current = read_from_disk(brm)
|
||||||
|
else:
|
||||||
|
brm_saves_current = b"BRM_saves\n"
|
||||||
else:
|
else:
|
||||||
# load local dbrm
|
brm_saves_current = read_from_disk(brm)
|
||||||
try:
|
|
||||||
brm_saves_current = subprocess.check_output(['cat', brm])
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
# will happen when brm file does not exist
|
|
||||||
print('{} does not exist.'.format(brm), file=sys.stderr)
|
|
||||||
|
|
||||||
if brm_saves_current:
|
if brm_saves_current:
|
||||||
if use_systemd:
|
if use_systemd:
|
||||||
@ -163,8 +222,11 @@ brm_saves_current.decode("utf-8").replace("BRM_saves", ""))
|
|||||||
if retcode < 0:
|
if retcode < 0:
|
||||||
print('{} exits with {}.'.format(cmd, retcode))
|
print('{} exits with {}.'.format(cmd, retcode))
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
# systemd services by default works using mysql privileges.
|
||||||
|
# There were cases when shmem segments belongs to root so
|
||||||
|
# explicitly chowns segments.
|
||||||
if use_systemd:
|
if use_systemd:
|
||||||
for shm_file in glob.glob('/dev/shm/*'):
|
for shm_file in glob.glob('/dev/shm/*'):
|
||||||
shutil.chown(shm_file, USER, GROUP)
|
shutil.chown(shm_file, USER, GROUP)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
Reference in New Issue
Block a user