1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00
Files
mariadb-AlanMologorsky a079a2c944 MCOL-5496: Merge CMAPI code to engine repo.
[add] cmapi code to engine
2023-06-07 10:00:16 +03:00

295 lines
10 KiB
Python

"""
Module contains non-systemd/container process dispatcher class implementation.
"""
import logging
import os.path
import re
from pathlib import Path
from time import sleep
import psutil
from cmapi_server.constants import (
IFLAG, LIBJEMALLOC_DEFAULT_PATH, MCS_INSTALL_BIN, ALL_MCS_PROGS
)
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.process_dispatchers.base import BaseDispatcher
class ContainerDispatcher(BaseDispatcher):
"""Manipulates processes in docker container.
It's possible to use in any OS/container environment in cases when
we don't want to use systemd or don't have it.
"""
libjemalloc_path = None
@staticmethod
def _set_iflag():
"""Create IFLAG file.
Means Columnstore container init finished.
"""
Path(IFLAG).touch()
@classmethod
def _get_proc_object(cls, name: str) -> psutil.Process:
"""Getting psutil Process object by service name.
:param name: process name
:type name: str
:raises psutil.NoSuchProcess: if no process with such name presented
:return: Process object with specified name
:rtype: psutil.Process
...TODO: add types-psutil to requirements for mypy checks
"""
for proc in psutil.process_iter(['pid', 'name', 'username']):
if proc.name().lower() == name.lower():
return proc
raise psutil.NoSuchProcess(pid=None, name=name)
@classmethod
def get_libjemalloc_path(cls) -> str:
"""Get libjemalloc.so path.
:raises CMAPIBasicError: raises if ldconfig execution returned non zero
:raises FileNotFoundError: if no libjemalloc.so.2 found
:return: libjemalloc.so.2 path
:rtype: str
"""
logger = logging.getLogger('container_sh')
if cls.libjemalloc_path:
return cls.libjemalloc_path
# pylint: disable=line-too-long
# for reference: https://github.com/pyinstaller/pyinstaller/blob/f29b577df4e1659cf65aacb797034763308fd298/PyInstaller/depend/utils.py#L304
splitlines_count = 1
pattern = re.compile(r'^\s+(\S+)(\s.*)? => (\S+)')
success, result = cls.exec_command('ldconfig -p')
if not success:
raise CMAPIBasicError('Failed executing ldconfig.')
text = result.strip().splitlines()[splitlines_count:]
for line in text:
# this assumes library names do not contain whitespace
p_match = pattern.match(line)
# Sanitize away any abnormal lines of output.
if p_match is None:
continue
lib_path = p_match.groups()[-1]
lib_name = p_match.group(1)
if 'libjemalloc' in lib_name:
# use the first entry
# TODO: do we need path or name here?
# $(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}')
cls.libjemalloc_path = lib_path
break
if not cls.libjemalloc_path:
if not os.path.exists(LIBJEMALLOC_DEFAULT_PATH):
logger.error('No libjemalloc.so.2 found.')
raise FileNotFoundError
cls.libjemalloc_path = LIBJEMALLOC_DEFAULT_PATH
return cls.libjemalloc_path
@classmethod
def is_service_running(cls, service: str, use_sudo: bool = True) -> bool:
"""Check if mcs process is running.
:param service: service name
:type service: str
:param use_sudo: interface requirement, unused here, defaults to True
:type use_sudo: bool, optional
:return: True if service is running, otherwise False
:rtype: bool
"""
try:
cls._get_proc_object(service)
except psutil.NoSuchProcess:
return False
return True
@staticmethod
def _make_cmd(service: str) -> str:
"""Make shell command by service name.
:param service: service name
:type service: str
:return: command with arguments if needed
:rtype: str
"""
service_info = ALL_MCS_PROGS[service]
command = os.path.join(MCS_INSTALL_BIN, service)
if service_info.subcommand:
subcommand = service_info.subcommand
command = f'{command} {subcommand}'
return command
@classmethod
def start(
cls, service: str, is_primary: bool, use_sudo: bool = True
) -> bool:
"""Start process in docker container.
:param service: process name
:type service: str
:param is_primary: is node primary or not
:type is_primary: bool, optional
:param use_sudo: interface required, unused here, defaults to True
:type use_sudo: bool, optional
:return: True if service started successfully
:rtype: bool
"""
logger = logging.getLogger('container_sh')
if cls.is_service_running(service):
return True
logger.debug(f'Starting {service}')
env_vars = {"LD_PRELOAD": cls.get_libjemalloc_path()}
command = cls._make_cmd(service)
if service == 'workernode':
# workernode starts on primary and non primary node with 1 or 2
# added to the end of argument:
# DBRM_Worker1 - on primary, DBRM_Worker2 - non primary
command = command.format(1 if is_primary else 2)
# start mcs-loadbrm.py before workernode
logger.debug('Waiting to load BRM.')
loadbrm_path = os.path.join(MCS_INSTALL_BIN, 'mcs-loadbrm.py')
loadbrm_logpath = cls._create_mcs_process_logfile(
'mcs-loadbrm.log'
)
with open(loadbrm_logpath, 'a', encoding='utf-8') as loadbrm_logfh:
success, _ = cls.exec_command(
f'{loadbrm_path} no', stdout=loadbrm_logfh, env=env_vars
)
if not success:
logger.error('Error while loading BRM.')
else:
logger.debug('Successfully loaded BRM.')
service_log_path = cls._create_mcs_process_logfile(
f'{service.lower()}.log'
)
success, _ = cls.exec_command(
command, daemonize=True,
stdout=open(service_log_path, 'a', encoding='utf-8'),
env=env_vars
)
# TODO: any other way to detect service finished its initialisation?
sleep(ALL_MCS_PROGS[service].delay)
logger.debug(f'Started "{service}".')
if is_primary and service == 'DDLProc':
cls._run_dbbuilder()
return cls.is_service_running(service)
@classmethod
def stop(
cls, service: str, is_primary: bool, use_sudo: bool = True
) -> bool:
"""Stop process in docker container.
:param service: process name
:type service: str
:param is_primary: is node primary or not
:type is_primary: bool, optional
:param use_sudo: interface required, unused here, defaults to True
:type use_sudo: bool, optional
:return: True if service started successfully
:rtype: bool
"""
logger = logging.getLogger('container_sh')
if not cls.is_service_running(service):
return True
logger.debug(f'Stopping {service}')
service_proc = cls._get_proc_object(service)
if service == 'workernode':
# start mcs-savebrm.py before stoping workernode
logger.debug('Waiting to save BRM.')
savebrm_path = os.path.join(MCS_INSTALL_BIN, 'mcs-savebrm.py')
savebrm_logpath = cls._create_mcs_process_logfile(
'mcs-savebrm.log'
)
with open(savebrm_logpath, 'a', encoding='utf-8') as savebrm_logfh:
success, _ = cls.exec_command(
savebrm_path, stdout=savebrm_logfh
)
if not success:
logger.error('Error while saving BRM.')
else:
logger.debug('Successfully saved BRM.')
logger.debug('Start clearing SHM.')
clearshm_path = os.path.join(MCS_INSTALL_BIN, 'clearShm')
success, _ = cls.exec_command(clearshm_path)
if not success:
logger.error('Error while clearing SHM.')
else:
logger.debug('Successfully cleared SHM.')
service_proc.terminate()
# timeout got from old container.sh
# TODO: this is still not enough for controllernode process
# it should be always stop by SIGKILL, need to investigate.
timeout = 3
if service == 'StorageManager':
timeout = 300 # 5 minutes
logger.debug(f'Waiting to gracefully stop "{service}".')
# This function will return as soon as all processes terminate
# or when timeout (seconds) occurs.
gone, alive = psutil.wait_procs([service_proc], timeout=timeout)
if alive:
logger.debug(
f'{service} not terminated with SIGTERM, sending SIGKILL.'
)
# only one process could be in a list
alive[0].kill()
gone, alive = psutil.wait_procs([service_proc], timeout=timeout)
if gone:
logger.debug(f'Successfully killed "{service}".')
else:
logger.warning(
f'Service "{service}" still alive after sending "kill -9" '
f'and waiting {timeout} seconds.'
)
else:
logger.debug(f'Gracefully stopped "{service}".')
return not cls.is_service_running(service)
@classmethod
def restart(
cls, service: str, is_primary: bool, use_sudo: bool = True
) -> bool:
"""Restart process in docker container.
:param service: process name
:type service: str
:param is_primary: is node primary or not
:type is_primary: bool, optional
:param use_sudo: interface required, unused here, defaults to True
:type use_sudo: bool, optional
:return: True if service started successfully
:rtype: bool
...TODO: for next releases. Additional error handling.
"""
if cls.is_service_running(service):
# TODO: retry?
stop_success = cls.stop(service, is_primary, use_sudo)
start_success = cls.start(service, is_primary, use_sudo)
return stop_success and start_success