1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-5806: added ability to start node in read-only mode

* feat(cmapi): add read_only param for API add node endpoint
* style(cmapi): fixes for string length and quotes

Add dbroots of other nodes to the read-only node

On every node change adjust dbroots in the read-only nodes

Fix logging (trace level) in tests
This commit is contained in:
Alexander Presnyakov
2025-03-12 13:21:37 +00:00
committed by Serguey Zefirov
parent a27f1a1f98
commit c59e2aa9ee
18 changed files with 508 additions and 101 deletions

View File

@ -4,6 +4,7 @@ TODO: move main constant paths here and replace in files in next releases.
"""
import os
from typing import NamedTuple
from enum import Enum
# default MARIADB ColumnStore config path
@ -60,6 +61,16 @@ CMAPI_SINGLE_NODE_XML = os.path.join(
CMAPI_INSTALL_PATH, 'cmapi_server/SingleNode.xml'
)
class MCSProgs(Enum):
STORAGE_MANAGER = 'StorageManager'
WORKER_NODE = 'workernode'
CONTROLLER_NODE = 'controllernode'
PRIM_PROC = 'PrimProc'
EXE_MGR = 'ExeMgr'
WRITE_ENGINE_SERVER = 'WriteEngineServer'
DML_PROC = 'DMLProc'
DDL_PROC = 'DDLProc'
# constants for dispatchers
class ProgInfo(NamedTuple):
"""NamedTuple for some additional info about handling mcs processes."""
@ -73,17 +84,17 @@ class ProgInfo(NamedTuple):
# on top level of process handling
# mcs-storagemanager starts conditionally inside mcs-loadbrm, but should be
# stopped using cmapi
ALL_MCS_PROGS = {
ALL_MCS_PROGS: dict[MCSProgs, ProgInfo] = {
# workernode starts on primary and non primary node with 1 or 2 added
# to subcommand (DBRM_Worker1 - on primary, DBRM_Worker2 - non primary)
'StorageManager': ProgInfo(15, 'mcs-storagemanager', '', False, 1),
'workernode': ProgInfo(13, 'mcs-workernode', 'DBRM_Worker{}', False, 1),
'controllernode': ProgInfo(11, 'mcs-controllernode', 'fg', True),
'PrimProc': ProgInfo(5, 'mcs-primproc', '', False, 1),
'ExeMgr': ProgInfo(9, 'mcs-exemgr', '', False, 1),
'WriteEngineServer': ProgInfo(7, 'mcs-writeengineserver', '', False, 3),
'DMLProc': ProgInfo(3, 'mcs-dmlproc', '', False),
'DDLProc': ProgInfo(1, 'mcs-ddlproc', '', False),
MCSProgs.STORAGE_MANAGER: ProgInfo(15, 'mcs-storagemanager', '', False, 1),
MCSProgs.WORKER_NODE: ProgInfo(13, 'mcs-workernode', 'DBRM_Worker{}', False, 1),
MCSProgs.CONTROLLER_NODE: ProgInfo(11, 'mcs-controllernode', 'fg', True),
MCSProgs.PRIM_PROC: ProgInfo(5, 'mcs-primproc', '', False, 1),
MCSProgs.EXE_MGR: ProgInfo(9, 'mcs-exemgr', '', False, 1),
MCSProgs.WRITE_ENGINE_SERVER: ProgInfo(7, 'mcs-writeengineserver', '', False, 3),
MCSProgs.DML_PROC: ProgInfo(3, 'mcs-dmlproc', '', False),
MCSProgs.DDL_PROC: ProgInfo(1, 'mcs-ddlproc', '', False),
}
# constants for docker container dispatcher

View File

@ -68,6 +68,7 @@ class ClusterControllerClient:
:param node_info: Information about the node to add.
:return: The response from the API.
"""
#TODO: fix interface as in remove_node used or think about universal
return self._request('PUT', 'node', {**node_info, **extra})
def remove_node(

View File

@ -434,7 +434,7 @@ class ConfigController:
MCSProcessManager.stop_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo,
timeout=request_timeout
timeout=request_timeout,
)
except CMAPIBasicError as err:
raise_422_error(
@ -463,6 +463,7 @@ class ConfigController:
MCSProcessManager.start_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo,
is_read_only=node_config.is_read_only(),
)
except CMAPIBasicError as err:
raise_422_error(
@ -666,7 +667,8 @@ class StartController:
try:
MCSProcessManager.start_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo
use_sudo=use_sudo,
is_read_only=node_config.is_read_only(),
)
except CMAPIBasicError as err:
raise_422_error(
@ -701,7 +703,7 @@ class ShutdownController:
MCSProcessManager.stop_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo,
timeout=timeout
timeout=timeout,
)
except CMAPIBasicError as err:
raise_422_error(
@ -910,6 +912,7 @@ class ClusterController:
node = request_body.get('node', None)
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
in_transaction = request_body.get('in_transaction', False)
read_only = request_body.get('read_only', False)
if node is None:
raise_422_error(module_logger, func_name, 'missing node argument')
@ -917,9 +920,9 @@ class ClusterController:
try:
if not in_transaction:
with TransactionManager(extra_nodes=[node]):
response = ClusterHandler.add_node(node, config)
response = ClusterHandler.add_node(node, config, read_only)
else:
response = ClusterHandler.add_node(node, config)
response = ClusterHandler.add_node(node, config, read_only)
except CMAPIBasicError as err:
raise_422_error(module_logger, func_name, err.message)

View File

@ -15,7 +15,7 @@ from cmapi_server.helpers import (
get_current_key, get_version, update_revision_and_manager,
)
from cmapi_server.node_manipulation import (
add_node, add_dbroot, remove_node, switch_node_maintenance,
add_node, add_dbroot, remove_node, switch_node_maintenance, update_dbroots_of_readonly_nodes,
)
from mcs_node_control.models.misc import get_dbrm_master
from mcs_node_control.models.node_config import NodeConfig
@ -140,7 +140,10 @@ class ClusterHandler():
return {'timestamp': operation_start_time}
@staticmethod
def add_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict:
def add_node(
node: str, config: str = DEFAULT_MCS_CONF_PATH,
read_only: bool = False,
) -> dict:
"""Method to add node to MCS CLuster.
:param node: node IP or name or FQDN
@ -148,6 +151,8 @@ class ClusterHandler():
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param read_only: add node in read-only mode, defaults to False
:type read_only: bool, optional
:raises CMAPIBasicError: on exception while starting transaction
:raises CMAPIBasicError: if transaction start isn't successful
:raises CMAPIBasicError: on exception while adding node
@ -158,16 +163,21 @@ class ClusterHandler():
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug(f'Cluster add node command called. Adding node {node}.')
logger.debug(
f'Cluster add node command called. Adding node {node} in '
f'{"read-only" if read_only else "read-write"} mode.'
)
response = {'timestamp': str(datetime.now())}
try:
add_node(
node, input_config_filename=config,
output_config_filename=config
output_config_filename=config,
read_only=read_only,
)
if not get_dbroots(node, config):
if not read_only: # Read-only nodes don't own dbroots
add_dbroot(
host=node, input_config_filename=config,
output_config_filename=config
@ -214,6 +224,8 @@ class ClusterHandler():
node, input_config_filename=config,
output_config_filename=config
)
with NodeConfig().modify_config(config) as root:
update_dbroots_of_readonly_nodes(root)
except Exception as err:
raise CMAPIBasicError('Error while removing node.') from err

View File

@ -11,7 +11,6 @@ import os
import socket
import time
from collections import namedtuple
from functools import partial
from random import random
from shutil import copyfile
from typing import Tuple, Optional
@ -379,7 +378,7 @@ def broadcast_new_config(
) as response:
resp_json = await response.json(encoding='utf-8')
response.raise_for_status()
logging.info(f'Node {node} config put successfull.')
logging.info(f'Node {node} config put successful.')
except aiohttp.ClientResponseError as err:
# TODO: may be better to check if resp status is 422 cause
# it's like a signal that cmapi server raised it in
@ -577,6 +576,7 @@ def get_dbroots(node, config=DEFAULT_MCS_CONF_PATH):
dbroots = []
smc_node = root.find('./SystemModuleConfig')
mod_count = int(smc_node.find('./ModuleCount3').text)
for i in range(1, mod_count+1):
ip_addr = smc_node.find(f'./ModuleIPAddr{i}-1-3').text
hostname = smc_node.find(f'./ModuleHostName{i}-1-3').text
@ -596,6 +596,7 @@ def get_dbroots(node, config=DEFAULT_MCS_CONF_PATH):
dbroots.append(
smc_node.find(f"./ModuleDBRootID{i}-{j}-3").text
)
return dbroots

View File

@ -104,6 +104,7 @@ def enable_console_logging(logger: logging.Logger) -> None:
def config_cmapi_server_logging():
# add custom level TRACE only for develop purposes
# could be activated using API endpoints or cli tool without relaunching
if not hasattr(logging, 'TRACE'):
add_logging_level('TRACE', 5)
cherrypy._cplogging.LogManager.error = custom_cherrypy_error
# reconfigure cherrypy.access log message format

View File

@ -7,7 +7,8 @@ from time import sleep
import psutil
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.constants import MCS_INSTALL_BIN, ALL_MCS_PROGS
from cmapi_server.constants import MCS_INSTALL_BIN, ALL_MCS_PROGS, MCSProgs, ProgInfo
from cmapi_server.process_dispatchers.base import BaseDispatcher
from cmapi_server.process_dispatchers.systemd import SystemdDispatcher
from cmapi_server.process_dispatchers.container import (
ContainerDispatcher
@ -18,7 +19,7 @@ from mcs_node_control.models.misc import get_workernodes
from mcs_node_control.models.process import Process
PROCESS_DISPATCHERS = {
PROCESS_DISPATCHERS: dict[str, type[BaseDispatcher]] = {
'systemd': SystemdDispatcher,
# could be used in docker containers and OSes w/o systemd
'container': ContainerDispatcher,
@ -32,10 +33,10 @@ class MCSProcessManager:
e.g. re/-start or stop systemd services, run executable.
"""
CONTROLLER_MAX_RETRY = 30
mcs_progs = {}
mcs_progs: dict[str, ProgInfo] = {}
mcs_version_info = None
dispatcher_name = None
process_dispatcher = None
process_dispatcher: BaseDispatcher = None
@classmethod
def _get_prog_name(cls, name: str) -> str:
@ -47,12 +48,13 @@ class MCSProcessManager:
:rtype: str
"""
if cls.dispatcher_name == 'systemd':
return ALL_MCS_PROGS[name].service_name
prog = MCSProgs(name)
return ALL_MCS_PROGS[prog].service_name
return name
@classmethod
def _get_sorted_progs(
cls, is_primary: bool, reverse: bool = False
cls, is_primary: bool, reverse: bool = False, is_read_only: bool = False
) -> dict:
"""Get sorted services dict.
@ -72,6 +74,13 @@ class MCSProcessManager:
for prog_name, prog_info in cls.mcs_progs.items()
if prog_name not in PRIMARY_PROGS
}
if is_read_only:
logging.debug('Node is in read-only mode, skipping WriteEngine')
unsorted_progs.pop(
MCSProgs.WRITE_ENGINE_SERVER.value, None
)
if reverse:
# stop sequence builds using stop_priority property
return dict(
@ -89,7 +98,8 @@ class MCSProcessManager:
if cls.mcs_progs:
logging.warning('Mcs ProcessHandler already detected processes.')
for prog_name, prog_info in ALL_MCS_PROGS.items():
for prog, prog_info in ALL_MCS_PROGS.items():
prog_name = prog.value
if os.path.exists(os.path.join(MCS_INSTALL_BIN, prog_name)):
cls.mcs_progs[prog_name] = prog_info
@ -404,19 +414,26 @@ class MCSProcessManager:
return set(node_progs) == set(p['name'] for p in running_procs)
@classmethod
def start_node(cls, is_primary: bool, use_sudo: bool = True):
def start_node(
cls,
is_primary: bool,
use_sudo: bool = True,
is_read_only: bool = False,
) -> None:
"""Start mcs node processes.
:param is_primary: is node primary or not, defaults to True
:type is_primary: bool
:param use_sudo: use sudo or not, defaults to True
:type use_sudo: bool, optional
:param is_read_only: if true, doesn't start WriteEngine
:type is_read_only: bool, optional
:raises CMAPIBasicError: immediately if one mcs process not started
"""
for prog_name in cls._get_sorted_progs(is_primary):
for prog_name in cls._get_sorted_progs(is_primary=is_primary, is_read_only=is_read_only):
if (
cls.dispatcher_name == 'systemd'
and prog_name == 'StorageManager'
and prog_name == MCSProgs.STORAGE_MANAGER.value
):
# TODO: MCOL-5458
logging.info(
@ -424,9 +441,9 @@ class MCSProcessManager:
)
continue
# TODO: additional error handling
if prog_name == 'controllernode':
if prog_name == MCSProgs.CONTROLLER_NODE.value:
cls._wait_for_workernodes()
if prog_name in ('DMLProc', 'DDLProc'):
if prog_name in (MCSProgs.DML_PROC.value, MCSProgs.DDL_PROC.value):
cls._wait_for_controllernode()
if not cls.start(prog_name, is_primary, use_sudo):
logging.error(f'Process "{prog_name}" not started properly.')
@ -434,7 +451,10 @@ class MCSProcessManager:
@classmethod
def stop_node(
cls, is_primary: bool, use_sudo: bool = True, timeout: int = 10
cls,
is_primary: bool,
use_sudo: bool = True,
timeout: int = 10,
):
"""Stop mcs node processes.
@ -450,14 +470,14 @@ class MCSProcessManager:
# so use full available list of processes. Otherwise, it could cause
# undefined behaviour when primary gone and then recovers (failover
# triggered 2 times).
for prog_name in cls._get_sorted_progs(True, reverse=True):
for prog_name in cls._get_sorted_progs(is_primary=True, reverse=True):
if not cls.stop(prog_name, is_primary, use_sudo):
logging.error(f'Process "{prog_name}" not stopped properly.')
raise CMAPIBasicError(f'Error while stopping "{prog_name}"')
@classmethod
def restart_node(cls, is_primary: bool, use_sudo: bool):
def restart_node(cls, is_primary: bool, use_sudo: bool, is_read_only: bool = False):
"""TODO: For next releases."""
if cls.get_running_mcs_procs():
cls.stop_node(is_primary, use_sudo)
cls.start_node(is_primary, use_sudo)
cls.start_node(is_primary, use_sudo, is_read_only)

View File

@ -106,10 +106,10 @@ class TransactionManager(ContextDecorator):
try:
rollback_transaction(self.txn_id, nodes=nodes)
self.active_transaction = False
logging.debug(f'Success rollback of transaction "{self.txn_id}".')
logging.debug(f'Successful rollback of transaction "{self.txn_id}".')
except Exception:
logging.error(
f'Error while rollback transaction "{self.txn_id}"',
f'Error while rolling back transaction "{self.txn_id}"',
exc_info=True
)

View File

@ -62,7 +62,8 @@ def switch_node_maintenance(
def add_node(
node: str, input_config_filename: str = DEFAULT_MCS_CONF_PATH,
output_config_filename: Optional[str] = None,
use_rebalance_dbroots: bool = True
use_rebalance_dbroots: bool = True,
read_only: bool = False,
):
"""Add node to a cluster.
@ -96,14 +97,23 @@ def add_node(
try:
if not _replace_localhost(c_root, node):
pm_num = _add_node_to_PMS(c_root, node)
if not read_only:
_add_WES(c_root, pm_num, node)
else:
logging.info('Node is read-only, skipping WES addition.')
_add_read_only_node(c_root, node)
_add_DBRM_Worker(c_root, node)
_add_Module_entries(c_root, node)
_add_active_node(c_root, node)
_add_node_to_ExeMgrs(c_root, node)
if use_rebalance_dbroots:
if not read_only:
_rebalance_dbroots(c_root)
_move_primary_node(c_root)
update_dbroots_of_readonly_nodes(c_root)
except Exception:
logging.error(
'Caught exception while adding node, config file is unchanged',
@ -157,7 +167,11 @@ def remove_node(
if len(active_nodes) > 1:
pm_num = _remove_node_from_PMS(c_root, node)
is_read_only = node in node_config.get_read_only_nodes(c_root)
if not is_read_only:
_remove_WES(c_root, pm_num)
_remove_DBRM_Worker(c_root, node)
_remove_Module_entries(c_root, node)
_remove_from_ExeMgrs(c_root, node)
@ -168,9 +182,11 @@ def remove_node(
# TODO: unspecific name, need to think of a better one
_remove_node(c_root, node)
if use_rebalance_dbroots:
if use_rebalance_dbroots and not is_read_only:
_rebalance_dbroots(c_root)
_move_primary_node(c_root)
update_dbroots_of_readonly_nodes(c_root)
else:
# TODO:
# - IMO undefined behaviour here. Removing one single node
@ -244,7 +260,7 @@ def rebalance_dbroots(
#
# returns the id of the new dbroot on success
# raises an exception on error
def add_dbroot(input_config_filename = None, output_config_filename = None, host = None):
def add_dbroot(input_config_filename = None, output_config_filename = None, host = None) -> int:
node_config = NodeConfig()
if input_config_filename is None:
c_root = node_config.get_current_config_root()
@ -376,12 +392,16 @@ def __remove_helper(parent_node, node):
def _remove_node(root, node):
'''
remove node from DesiredNodes, InactiveNodes, and ActiveNodes
remove node from DesiredNodes, InactiveNodes, ActiveNodes and (if present) ReadOnlyNodes
'''
for n in (root.find("./DesiredNodes"), root.find("./InactiveNodes"), root.find("./ActiveNodes")):
__remove_helper(n, node)
read_only_nodes = root.find('./ReadOnlyNodes')
if read_only_nodes is not None:
__remove_helper(read_only_nodes, node)
# This moves a node from ActiveNodes to InactiveNodes
def _deactivate_node(root, node):
@ -529,6 +549,19 @@ def unassign_dbroot1(root):
i += 1
def _get_existing_db_roots(root: etree.Element) -> list[int]:
'''Get all the existing dbroot IDs from the config file'''
# There can be holes in the dbroot numbering, so can't just scan from [1-dbroot_count]
# Going to scan from 1-99 instead
sysconf_node = root.find("./SystemConfig")
existing_dbroots = []
for num in range(1, 100):
node = sysconf_node.find(f"./DBRoot{num}")
if node is not None:
existing_dbroots.append(num)
return existing_dbroots
def _rebalance_dbroots(root, test_mode=False):
# TODO: add code to detect whether we are using shared storage or not. If not, exit
# without doing anything.
@ -572,14 +605,7 @@ def _rebalance_dbroots(root, test_mode=False):
current_mapping = get_current_dbroot_mapping(root)
sysconf_node = root.find("./SystemConfig")
# There can be holes in the dbroot numbering, so can't just scan from [1-dbroot_count]
# Going to scan from 1-99 instead.
existing_dbroots = []
for num in range(1, 100):
node = sysconf_node.find(f"./DBRoot{num}")
if node is not None:
existing_dbroots.append(num)
existing_dbroots = _get_existing_db_roots(root)
# assign the unassigned dbroots
unassigned_dbroots = set(existing_dbroots) - set(current_mapping[0])
@ -631,7 +657,7 @@ def _rebalance_dbroots(root, test_mode=False):
# timed out
# possible node is not ready, leave retry as-is
pass
except Exception as e:
except Exception:
retry = False
if not found_master:
@ -994,6 +1020,22 @@ def _add_WES(root, pm_num, node):
etree.SubElement(wes_node, "Port").text = "8630"
def _add_read_only_node(root: etree.Element, node: str) -> None:
'''Add node name to ReadOnlyNodes if it is not already there'''
read_only_nodes = root.find('./ReadOnlyNodes')
if read_only_nodes is None:
read_only_nodes = etree.SubElement(root, 'ReadOnlyNodes')
else:
for n in read_only_nodes.findall("./Node"):
if n.text == node:
logging.warning(
f"_add_read_only_node(): node {node} already exists in ReadOnlyNodes"
)
return
etree.SubElement(read_only_nodes, "Node").text = node
def _add_DBRM_Worker(root, node):
'''
find the highest numbered DBRM_Worker entry, or one that isn't used atm
@ -1096,7 +1138,7 @@ def _add_node_to_PMS(root, node):
return new_pm_num
def _replace_localhost(root, node):
def _replace_localhost(root: etree.Element, node: str) -> bool:
# if DBRM_Controller/IPAddr is 127.0.0.1 or localhost,
# then replace all instances, else do nothing.
controller_host = root.find('./DBRM_Controller/IPAddr')
@ -1144,3 +1186,75 @@ def _replace_localhost(root, node):
# New Exception types
class NodeNotFoundException(Exception):
pass
def get_pm_module_num_to_addr_map(root: etree.Element) -> dict[int, str]:
"""Get a mapping of PM module numbers to their IP addresses"""
module_num_to_addr = {}
smc_node = root.find("./SystemModuleConfig")
mod_count = int(smc_node.find("./ModuleCount3").text)
for i in range(1, mod_count + 1):
ip_addr = smc_node.find(f"./ModuleIPAddr{i}-1-3").text
module_num_to_addr[i] = ip_addr
return module_num_to_addr
def update_dbroots_of_readonly_nodes(root: etree.Element) -> None:
"""Read-only nodes do not have their own dbroots, but they must have all the dbroots of the other nodes
So this function sets list of dbroots of each read-only node to the list of all the dbroots in the cluster
"""
nc = NodeConfig()
pm_num_to_addr = get_pm_module_num_to_addr_map(root)
for ro_node in nc.get_read_only_nodes(root):
# Get PM num by IP address
this_ip_pm_num = None
for pm_num, pm_addr in pm_num_to_addr.items():
if pm_addr == ro_node:
this_ip_pm_num = pm_num
break
if this_ip_pm_num is not None:
# Add dbroots of other nodes to this read-only node
add_dbroots_of_other_nodes(root, this_ip_pm_num)
else: # This should not happen
err_msg = f"Could not find PM number for read-only node {ro_node}"
logging.error(err_msg)
raise NodeNotFoundException(err_msg)
def add_dbroots_of_other_nodes(root: etree.Element, module_num: int) -> None:
"""Adds all the dbroots listed in the config to this (read-only) node"""
existing_dbroots = _get_existing_db_roots(root)
sysconf_node = root.find("./SystemModuleConfig")
# Remove existing dbroots from this module
remove_dbroots_of_node(root, module_num)
# Write node's dbroot count
dbroot_count_node = etree.SubElement(
sysconf_node, f"ModuleDBRootCount{module_num}-3"
)
dbroot_count_node.text = str(len(existing_dbroots))
# Write new dbroot IDs to the module mapping
for i, dbroot_id in enumerate(existing_dbroots, start=1):
dbroot_id_node = etree.SubElement(
sysconf_node, f"ModuleDBRootID{module_num}-{i}-3"
)
dbroot_id_node.text = str(dbroot_id)
logging.info("Added %d dbroots to read-only node %d: %s", len(existing_dbroots), module_num, sorted(existing_dbroots))
def remove_dbroots_of_node(root: etree.Element, module_num: int) -> None:
"""Removes all the dbroots listed in the config from this (read-only) node"""
sysconf_node = root.find("./SystemModuleConfig")
dbroot_count_node = sysconf_node.find(f"./ModuleDBRootCount{module_num}-3")
if dbroot_count_node is not None:
sysconf_node.remove(dbroot_count_node)
# Remove existing dbroot IDs
for i in range(1, 100):
dbroot_id_node = sysconf_node.find(f"./ModuleDBRootID{module_num}-{i}-3")
if dbroot_id_node is not None:
sysconf_node.remove(dbroot_id_node)

View File

@ -11,7 +11,7 @@ from time import sleep
import psutil
from cmapi_server.constants import (
IFLAG, LIBJEMALLOC_DEFAULT_PATH, MCS_INSTALL_BIN, ALL_MCS_PROGS
IFLAG, LIBJEMALLOC_DEFAULT_PATH, MCS_INSTALL_BIN, ALL_MCS_PROGS, MCSProgs
)
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.process_dispatchers.base import BaseDispatcher
@ -126,7 +126,8 @@ class ContainerDispatcher(BaseDispatcher):
:return: command with arguments if needed
:rtype: str
"""
service_info = ALL_MCS_PROGS[service]
prog = MCSProgs(service)
service_info = ALL_MCS_PROGS[prog]
command = os.path.join(MCS_INSTALL_BIN, service)
if service_info.subcommand:
@ -188,7 +189,8 @@ class ContainerDispatcher(BaseDispatcher):
env=env_vars
)
# TODO: any other way to detect service finished its initialisation?
sleep(ALL_MCS_PROGS[service].delay)
prog = MCSProgs(service)
sleep(ALL_MCS_PROGS[prog].delay)
logger.debug(f'Started "{service}".')
if is_primary and service == 'DDLProc':

View File

@ -6,6 +6,7 @@ from shutil import copyfile
import requests
from cmapi_server.constants import MCSProgs
from cmapi_server.controllers.dispatcher import _version
from cmapi_server.managers.process import MCSProcessManager
from cmapi_server.test.unittest_global import (
@ -199,7 +200,7 @@ class ClusterAddNodeTestCase(BaseClusterTestCase):
# Check Columntore started
controllernode = subprocess.check_output(
['pgrep', 'controllernode'])
['pgrep', MCSProgs.CONTROLLER_NODE.value])
self.assertIsNotNone(controllernode)

View File

@ -65,7 +65,7 @@ class MCSProcessManagerTest(BaseProcessDispatcherCase):
def test_mcs_process_manager(self):
MCSProcessManager.detect('systemd', '')
for prog in MCSProcessManager._get_sorted_progs(True, True).values():
for prog in MCSProcessManager._get_sorted_progs(is_primary=True, reverse=True).values():
serv_name = self.get_systemd_serv_name(prog.service_name)
os.system(f'{SYSTEMCTL} stop {serv_name}')
self.assertIsNone(MCSProcessManager.start_node(is_primary=True, use_sudo=False))
@ -95,7 +95,7 @@ class MCSProcessManagerTest(BaseProcessDispatcherCase):
)
)
for prog in MCSProcessManager._get_sorted_progs(True).values():
for prog in MCSProcessManager._get_sorted_progs(is_primary=True).values():
serv_name = self.get_systemd_serv_name(prog.service_name)
os.system(f'{SYSTEMCTL} start {serv_name}')

View File

@ -1,18 +1,20 @@
import logging
import socket
import unittest
from unittest.mock import patch
from lxml import etree
from cmapi_server import node_manipulation
from cmapi_server.constants import MCS_DATA_PATH
from cmapi_server.test.unittest_global import (
tmp_mcs_config_filename, BaseNodeManipTestCase
)
from cmapi_server.node_manipulation import add_dbroots_of_other_nodes, remove_dbroots_of_node, update_dbroots_of_readonly_nodes
from cmapi_server.test.unittest_global import BaseNodeManipTestCase, tmp_mcs_config_filename
from mcs_node_control.models.node_config import NodeConfig
logging.basicConfig(level='DEBUG')
SINGLE_NODE_XML = "./cmapi_server/SingleNode.xml"
class NodeManipTester(BaseNodeManipTestCase):
@ -21,12 +23,18 @@ class NodeManipTester(BaseNodeManipTestCase):
'./test-output0.xml','./test-output1.xml','./test-output2.xml'
)
hostaddr = socket.gethostbyname(socket.gethostname())
with patch('cmapi_server.node_manipulation.update_dbroots_of_readonly_nodes') as mock_update_dbroots_of_readonly_nodes:
node_manipulation.add_node(
self.NEW_NODE_NAME, tmp_mcs_config_filename, self.tmp_files[0]
)
mock_update_dbroots_of_readonly_nodes.assert_called_once()
mock_update_dbroots_of_readonly_nodes.reset_mock()
node_manipulation.add_node(
hostaddr, self.tmp_files[0], self.tmp_files[1]
)
mock_update_dbroots_of_readonly_nodes.assert_called_once()
# get a NodeConfig, read test.xml
# look for some of the expected changes.
@ -40,10 +48,13 @@ class NodeManipTester(BaseNodeManipTestCase):
node = root.find("./ExeMgr2/IPAddr")
self.assertEqual(node.text, hostaddr)
with patch('cmapi_server.node_manipulation.update_dbroots_of_readonly_nodes') as mock_update_dbroots_of_readonly_nodes:
node_manipulation.remove_node(
self.NEW_NODE_NAME, self.tmp_files[1], self.tmp_files[2],
test_mode=True
)
mock_update_dbroots_of_readonly_nodes.assert_called_once()
nc = NodeConfig()
root = nc.get_current_config_root(self.tmp_files[2])
node = root.find('./PMS1/IPAddr')
@ -52,6 +63,64 @@ class NodeManipTester(BaseNodeManipTestCase):
# node = root.find('./PMS2/IPAddr')
# self.assertEqual(node, None)
def test_add_remove_read_only_node(self):
"""add_node(read_only=True) should add a read-only node into the config, it does not add a WriteEngineServer (WES) and does not own dbroots"""
self.tmp_files = ('./config_output_rw.xml', './config_output_ro.xml', './config_output_ro_removed.xml')
# Add this host as a read-write node
local_host_addr = socket.gethostbyname(socket.gethostname())
node_manipulation.add_node(
local_host_addr, SINGLE_NODE_XML, self.tmp_files[0]
)
# Mock _rebalance_dbroots and _move_primary_node (only after the first node is added)
with patch('cmapi_server.node_manipulation._rebalance_dbroots') as mock_rebalance_dbroots, \
patch('cmapi_server.node_manipulation._move_primary_node') as mock_move_primary_node, \
patch('cmapi_server.node_manipulation.update_dbroots_of_readonly_nodes') as mock_update_dbroots_of_readonly_nodes:
# Add a read-only node
node_manipulation.add_node(
self.NEW_NODE_NAME, self.tmp_files[0], self.tmp_files[1],
read_only=True,
)
nc = NodeConfig()
root = nc.get_current_config_root(self.tmp_files[1])
# Check if read-only nodes section exists and is filled
read_only_nodes = nc.get_read_only_nodes(root)
self.assertEqual(len(read_only_nodes), 1)
self.assertEqual(read_only_nodes[0], self.NEW_NODE_NAME)
# Check if PMS was added
pms_node_ipaddr = root.find('./PMS2/IPAddr')
self.assertEqual(pms_node_ipaddr.text, self.NEW_NODE_NAME)
# Check that WriteEngineServer was not added
wes_node = root.find('./pm2_WriteEngineServer')
self.assertIsNone(wes_node)
mock_rebalance_dbroots.assert_not_called()
mock_move_primary_node.assert_not_called()
mock_update_dbroots_of_readonly_nodes.assert_called_once()
mock_update_dbroots_of_readonly_nodes.reset_mock()
# Test read-only node removal
node_manipulation.remove_node(
self.NEW_NODE_NAME, self.tmp_files[1], self.tmp_files[2],
deactivate_only=False,
)
nc = NodeConfig()
root = nc.get_current_config_root(self.tmp_files[2])
read_only_nodes = nc.get_read_only_nodes(root)
self.assertEqual(len(read_only_nodes), 0)
mock_rebalance_dbroots.assert_not_called()
mock_move_primary_node.assert_not_called()
mock_update_dbroots_of_readonly_nodes.assert_called_once()
def test_add_dbroots_nodes_rebalance(self):
self.tmp_files = (
'./extra-dbroots-0.xml', './extra-dbroots-1.xml',
@ -209,3 +278,111 @@ class NodeManipTester(BaseNodeManipTestCase):
caught_it = True
self.assertTrue(caught_it)
class TestDBRootsManipulation(unittest.TestCase):
our_module_idx = 3
ro_node1_ip = '192.168.1.3'
ro_node2_ip = '192.168.1.4'
def setUp(self):
# Mock initial XML structure (add two nodes and two dbroots)
self.root = etree.Element('Columnstore')
# Add two PM modules with IP addresses
smc = etree.SubElement(self.root, 'SystemModuleConfig')
module_count = etree.SubElement(smc, 'ModuleCount3')
module_count.text = '2'
module1_ip = etree.SubElement(smc, 'ModuleIPAddr1-1-3')
module1_ip.text = '192.168.1.1'
module2_ip = etree.SubElement(smc, 'ModuleIPAddr2-1-3')
module2_ip.text = '192.168.1.2'
system_config = etree.SubElement(self.root, 'SystemConfig')
dbroot_count = etree.SubElement(system_config, 'DBRootCount')
dbroot_count.text = '2'
dbroot1 = etree.SubElement(system_config, 'DBRoot1')
dbroot1.text = '/data/dbroot1'
dbroot2 = etree.SubElement(system_config, 'DBRoot2')
dbroot2.text = '/data/dbroot2'
def test_get_pm_module_num_to_addr_map(self):
result = node_manipulation.get_pm_module_num_to_addr_map(self.root)
expected = {
1: '192.168.1.1',
2: '192.168.1.2',
}
self.assertEqual(result, expected)
def test_add_dbroots_of_other_nodes(self):
'''add_dbroots_of_other_nodes must add dbroots of other nodes into mapping of the node.'''
add_dbroots_of_other_nodes(self.root, self.our_module_idx)
# Check that ModuleDBRootCount of the module was updated
module_count = self.root.find(f'./SystemModuleConfig/ModuleDBRootCount{self.our_module_idx}-3')
self.assertIsNotNone(module_count)
self.assertEqual(module_count.text, '2')
# Check that dbroots were added to ModuleDBRootID{module_num}-{i}-3
dbroot1 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{self.our_module_idx}-1-3')
dbroot2 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{self.our_module_idx}-2-3')
self.assertIsNotNone(dbroot1)
self.assertIsNotNone(dbroot2)
self.assertEqual(dbroot1.text, '1')
self.assertEqual(dbroot2.text, '2')
def test_remove_dbroots_of_node(self):
'''Test that remove_dbroots_of_node correctly removes dbroots from the node's mapping'''
# Add dbroot association to the node
smc = self.root.find('./SystemModuleConfig')
dbroot1 = etree.SubElement(smc, f'ModuleDBRootID{self.our_module_idx}-1-3')
dbroot1.text = '1'
dbroot2 = etree.SubElement(smc, f'ModuleDBRootID{self.our_module_idx}-2-3')
dbroot2.text = '2'
# Add ModuleDBRootCount to the node
module_count = etree.SubElement(smc, f'ModuleDBRootCount{self.our_module_idx}-3')
module_count.text = '2'
remove_dbroots_of_node(self.root, self.our_module_idx)
# Check that ModuleDBRootCount was removed
module_count = self.root.find(f'./SystemModuleConfig/ModuleDBRootCount{self.our_module_idx}-3')
self.assertIsNone(module_count)
# Check that dbroot mappings of the module were removed
dbroot1 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{self.our_module_idx}-1-3')
dbroot2 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{self.our_module_idx}-2-3')
self.assertIsNone(dbroot1)
self.assertIsNone(dbroot2)
def test_update_dbroots_of_readonly_nodes(self):
"""Test that update_dbroots_of_readonly_nodes adds all existing dbroots to all existing read-only nodes"""
# Add two new new modules to the XML structure (two already exist)
smc = self.root.find('./SystemModuleConfig')
module_count = smc.find('./ModuleCount3')
module_count.text = '4'
module3_ip = etree.SubElement(smc, 'ModuleIPAddr3-1-3')
module3_ip.text = self.ro_node1_ip
module4_ip = etree.SubElement(smc, 'ModuleIPAddr4-1-3')
module4_ip.text = self.ro_node2_ip
# Add them to ReadOnlyNodes
read_only_nodes = etree.SubElement(self.root, 'ReadOnlyNodes')
for ip in [self.ro_node1_ip, self.ro_node2_ip]:
node = etree.SubElement(read_only_nodes, 'Node')
node.text = ip
update_dbroots_of_readonly_nodes(self.root)
# Check that read only nodes have all the dbroots
for ro_module_idx in range(3, 5):
module_count = self.root.find(f'./SystemModuleConfig/ModuleDBRootCount{ro_module_idx}-3')
self.assertIsNotNone(module_count)
self.assertEqual(module_count.text, '2')
dbroot1 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{ro_module_idx}-1-3')
dbroot2 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{ro_module_idx}-2-3')
self.assertIsNotNone(dbroot1)
self.assertIsNotNone(dbroot2)
self.assertEqual(dbroot1.text, '1')
self.assertEqual(dbroot2.text, '2')

View File

@ -2,21 +2,14 @@ import logging
import os
import unittest
from contextlib import contextmanager
from datetime import datetime, timedelta
from shutil import copyfile
from tempfile import TemporaryDirectory
import cherrypy
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cmapi_server import helpers
from cmapi_server.constants import CMAPI_CONF_PATH
from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error
from cmapi_server.logging_management import config_cmapi_server_logging
from cmapi_server.managers.process import MCSProcessManager
from cmapi_server.managers.certificate import CertificateManager
@ -80,6 +73,7 @@ class BaseServerTestCase(unittest.TestCase):
)
copyfile(cmapi_config_filename, self.cmapi_config_filename)
copyfile(TEST_MCS_CONFIG_FILEPATH, self.mcs_config_filename)
config_cmapi_server_logging()
self.app = cherrypy.tree.mount(
root=None, config=self.cmapi_config_filename
)

View File

@ -76,7 +76,10 @@ def setup_logging(verbose: bool = False) -> None:
add_logging_level('TRACE', 5)
dict_config(MCS_CLI_LOG_CONF_PATH)
if verbose:
enable_console_logging(logging.getLogger())
for logger_name in ("", "mcs_cli"):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
enable_console_logging(logger)
if __name__ == '__main__':

View File

@ -198,6 +198,14 @@ def add(
'node IP, name or FQDN. '
'Can be used multiple times to add several nodes at a time.'
)
),
read_only: bool = typer.Option(
False,
'--read-only',
help=(
'Add node (or nodes, if more than one is passed) in read-only '
'mode.'
)
)
):
"""Add nodes to the Columnstore cluster."""
@ -207,7 +215,9 @@ def add(
extra_nodes=nodes
):
for node in nodes:
result.append(client.add_node({'node': node}))
result.append(
client.add_node({'node': node, 'read_only': read_only})
)
return result

View File

@ -1,26 +1,27 @@
import configparser
from contextlib import contextmanager
import grp
import logging
import pwd
import re
import socket
from os import mkdir, replace, chown
from os import chown, mkdir, replace
from pathlib import Path
from shutil import copyfile
from typing import Optional
from xml.dom import minidom # to pick up pretty printing functionality
from lxml import etree
from cmapi_server.constants import (
DEFAULT_MCS_CONF_PATH, DEFAULT_SM_CONF_PATH,
DEFAULT_MCS_CONF_PATH,
DEFAULT_SM_CONF_PATH,
MCS_MODULE_FILE_PATH,
)
# from cmapi_server.managers.process import MCSProcessManager
from mcs_node_control.models.misc import (
read_module_id, get_dbroots_list
)
from mcs_node_control.models.network_ifaces import get_network_interfaces
# from cmapi_server.managers.process import MCSProcessManager
from mcs_node_control.models.misc import get_dbroots_list, read_module_id
from mcs_node_control.models.network_ifaces import get_network_interfaces
module_logger = logging.getLogger()
@ -36,7 +37,7 @@ class NodeConfig:
"""
def get_current_config_root(
self, config_filename: str = DEFAULT_MCS_CONF_PATH, upgrade=True
):
) -> etree.Element:
"""Retrieves current configuration.
Read the config and returns Element.
@ -49,7 +50,7 @@ class NodeConfig:
self.upgrade_config(tree=tree, upgrade=upgrade)
return tree.getroot()
def get_root_from_string(self, config_string: str):
def get_root_from_string(self, config_string: str) -> etree.Element:
root = etree.fromstring(config_string)
self.upgrade_config(root=root)
return root
@ -137,6 +138,26 @@ class NodeConfig:
f.write(self.to_string(tree))
replace(tmp_filename, filename) # atomic replacement
@contextmanager
def modify_config(
self,
input_config_filename: str = DEFAULT_MCS_CONF_PATH,
output_config_filename: Optional[str] = None,
):
"""Context manager to modify the config file
If exception is raised, the config file is not modified and exception is re-raised
If output_config_filename is not provided, the input config file is modified
"""
try:
c_root = self.get_current_config_root(input_config_filename)
yield c_root
except Exception as e:
logging.error(f"modify_config(): Caught exception: '{str(e)}', config file not modified")
raise
else:
output_config_filename = output_config_filename or input_config_filename
self.write_config(c_root, output_config_filename)
def to_string(self, tree):
# TODO: try to use lxml to do this to avoid the add'l dependency
xmlstr = minidom.parseString(etree.tostring(tree)).toprettyxml(
@ -566,4 +587,18 @@ has dbroot {subel.text}')
for i in range(1, mod_count+1):
for j in range(1, int(smc_node.find(f"./ModuleDBRootCount{i}-3").text) + 1):
dbroots.append(smc_node.find(f"./ModuleDBRootID{i}-{j}-3").text)
return dbroots
def get_read_only_nodes(self, root=None) -> list[str]:
"""Get names of read only nodes from config"""
root = root or self.get_current_config_root()
return [node.text for node in root.findall('./ReadOnlyNodes/Node')]
def is_read_only(self, root=None) -> bool:
"""Checks if this node is in read-only mode"""
root = root or self.get_current_config_root()
read_only_nodes = set(self.get_read_only_nodes(root))
my_names = set(self.get_network_addresses_and_names())
return bool(read_only_nodes.intersection(my_names))

22
cmapi/pyproject.toml Normal file
View File

@ -0,0 +1,22 @@
[tool.ruff]
line-length = 80
target-version = "py39"
# Enable common rule sets
select = [
"E", # pycodestyle errors
"F", # pyflakes: undefined names, unused imports, etc.
"I", # isort: import sorting
"B", # flake8-bugbear: common bugs and anti-patterns
"UP", # pyupgrade: use modern Python syntax
"N", # pep8-naming: naming conventions
]
ignore = []
# Exclude cache and temporary directories
exclude = [
"__pycache__",
]
[tool.ruff.format]
quote-style = "single"