1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

MCOL-5806: added ability to start node in read-only mode

This commit is contained in:
Alexander Presnyakov
2025-03-12 13:21:37 +00:00
parent 8859e3f4df
commit 37453ad47e
11 changed files with 213 additions and 42 deletions

View File

@@ -4,6 +4,7 @@ TODO: move main constant paths here and replace in files in next releases.
"""
import os
from typing import NamedTuple
from enum import Enum
# default MARIADB ColumnStore config path
@@ -53,6 +54,16 @@ CMAPI_SINGLE_NODE_XML = os.path.join(
CMAPI_INSTALL_PATH, 'cmapi_server/SingleNode.xml'
)
class MCSProgs(Enum):
STORAGE_MANAGER = 'StorageManager'
WORKER_NODE = 'workernode'
CONTROLLER_NODE = 'controllernode'
PRIM_PROC = 'PrimProc'
EXE_MGR = 'ExeMgr'
WRITE_ENGINE_SERVER = 'WriteEngineServer'
DML_PROC = 'DMLProc'
DDL_PROC = 'DDLProc'
# constants for dispatchers
class ProgInfo(NamedTuple):
"""NamedTuple for some additional info about handling mcs processes."""
@@ -66,17 +77,17 @@ class ProgInfo(NamedTuple):
# on top level of process handling
# mcs-storagemanager starts conditionally inside mcs-loadbrm, but should be
# stopped using cmapi
ALL_MCS_PROGS = {
ALL_MCS_PROGS: dict[str, ProgInfo] = {
# workernode starts on primary and non primary node with 1 or 2 added
# to subcommand (DBRM_Worker1 - on primary, DBRM_Worker2 - non primary)
'StorageManager': ProgInfo(15, 'mcs-storagemanager', '', False, 1),
'workernode': ProgInfo(13, 'mcs-workernode', 'DBRM_Worker{}', False, 1),
'controllernode': ProgInfo(11, 'mcs-controllernode', 'fg', True),
'PrimProc': ProgInfo(5, 'mcs-primproc', '', False, 1),
'ExeMgr': ProgInfo(9, 'mcs-exemgr', '', False, 1),
'WriteEngineServer': ProgInfo(7, 'mcs-writeengineserver', '', False, 3),
'DMLProc': ProgInfo(3, 'mcs-dmlproc', '', False),
'DDLProc': ProgInfo(1, 'mcs-ddlproc', '', False),
MCSProgs.STORAGE_MANAGER.value: ProgInfo(15, 'mcs-storagemanager', '', False, 1),
MCSProgs.WORKER_NODE.value: ProgInfo(13, 'mcs-workernode', 'DBRM_Worker{}', False, 1),
MCSProgs.CONTROLLER_NODE.value: ProgInfo(11, 'mcs-controllernode', 'fg', True),
MCSProgs.PRIM_PROC.value: ProgInfo(5, 'mcs-primproc', '', False, 1),
MCSProgs.EXE_MGR.value: ProgInfo(9, 'mcs-exemgr', '', False, 1),
MCSProgs.WRITE_ENGINE_SERVER.value: ProgInfo(7, 'mcs-writeengineserver', '', False, 3),
MCSProgs.DML_PROC.value: ProgInfo(3, 'mcs-dmlproc', '', False),
MCSProgs.DDL_PROC.value: ProgInfo(1, 'mcs-ddlproc', '', False),
}
# constants for docker container dispatcher

View File

@@ -434,7 +434,8 @@ class ConfigController:
MCSProcessManager.stop_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo,
timeout=request_timeout
timeout=request_timeout,
is_read_only=node_config.is_read_only(),
)
except CMAPIBasicError as err:
raise_422_error(
@@ -463,6 +464,7 @@ class ConfigController:
MCSProcessManager.start_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo,
is_read_only=node_config.is_read_only(),
)
except CMAPIBasicError as err:
raise_422_error(
@@ -666,7 +668,8 @@ class StartController:
try:
MCSProcessManager.start_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo
use_sudo=use_sudo,
is_read_only=node_config.is_read_only(),
)
except CMAPIBasicError as err:
raise_422_error(
@@ -701,7 +704,8 @@ class ShutdownController:
MCSProcessManager.stop_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo,
timeout=timeout
timeout=timeout,
is_read_only=node_config.is_read_only(),
)
except CMAPIBasicError as err:
raise_422_error(

View File

@@ -95,7 +95,7 @@ class FailoverAgent(AgentBase):
try:
# TODO: remove test_mode condition and add mock for testing
if not test_mode:
MCSProcessManager.stop_node(is_primary=nc.is_primary_node())
MCSProcessManager.stop_node(is_primary=nc.is_primary_node(), is_read_only=nc.is_read_only())
logger.info(
'FA.enterStandbyMode(): successfully stopped node.'
)

View File

@@ -139,7 +139,10 @@ class ClusterHandler():
return {'timestamp': operation_start_time}
@staticmethod
def add_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict:
def add_node(
node: str, config: str = DEFAULT_MCS_CONF_PATH,
read_only: bool = False,
) -> dict:
"""Method to add node to MCS CLuster.
:param node: node IP or name or FQDN
@@ -147,6 +150,8 @@ class ClusterHandler():
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param read_only: add node in read-only mode, defaults to False
:type read_only: bool, optional
:raises CMAPIBasicError: on exception while starting transaction
:raises CMAPIBasicError: if transaction start isn't successful
:raises CMAPIBasicError: on exception while adding node
@@ -157,20 +162,25 @@ class ClusterHandler():
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug(f'Cluster add node command called. Adding node {node}.')
logger.debug('Cluster add node command called. Adding node %s in %s mode.', node, 'read-only' if read_only else 'read-write')
response = {'timestamp': str(datetime.now())}
try:
add_node(
node, input_config_filename=config,
output_config_filename=config
output_config_filename=config,
read_only=read_only,
)
if not get_dbroots(node, config):
add_dbroot(
host=node, input_config_filename=config,
output_config_filename=config
)
if not read_only: # Read-only nodes don't own dbroots
add_dbroot(
host=node, input_config_filename=config,
output_config_filename=config
)
else:
logger.debug("Node %s is read-only, skipping dbroot addition", node)
except Exception as err:
raise CMAPIBasicError('Error while adding node.') from err

View File

@@ -541,6 +541,10 @@ def get_desired_nodes(config=DEFAULT_MCS_CONF_PATH):
return [ node.text for node in nodes ]
def get_read_only_nodes(root) -> list[str]:
return [node.text for node in root.findall("./ReadOnlyNodes/Node")]
def in_maintenance_state(config=DEFAULT_MCS_CONF_PATH):
nc = NodeConfig()
root = nc.get_current_config_root(config, upgrade=False)
@@ -577,6 +581,7 @@ def get_dbroots(node, config=DEFAULT_MCS_CONF_PATH):
dbroots = []
smc_node = root.find('./SystemModuleConfig')
mod_count = int(smc_node.find('./ModuleCount3').text)
for i in range(1, mod_count+1):
ip_addr = smc_node.find(f'./ModuleIPAddr{i}-1-3').text
hostname = smc_node.find(f'./ModuleHostName{i}-1-3').text
@@ -596,6 +601,12 @@ def get_dbroots(node, config=DEFAULT_MCS_CONF_PATH):
dbroots.append(
smc_node.find(f"./ModuleDBRootID{i}-{j}-3").text
)
if dbroots and nc.is_read_only():
logger = logging.getLogger("dbroots")
logger.warning("Config contains dbroots %s for this read-only node, ignoring", dbroots)
return []
return dbroots

View File

@@ -7,7 +7,8 @@ from time import sleep
import psutil
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.constants import MCS_INSTALL_BIN, ALL_MCS_PROGS
from cmapi_server.constants import MCS_INSTALL_BIN, ALL_MCS_PROGS, MCSProgs
from cmapi_server.process_dispatchers.base import BaseDispatcher
from cmapi_server.process_dispatchers.systemd import SystemdDispatcher
from cmapi_server.process_dispatchers.container import (
ContainerDispatcher
@@ -18,7 +19,7 @@ from mcs_node_control.models.misc import get_workernodes
from mcs_node_control.models.process import Process
PROCESS_DISPATCHERS = {
PROCESS_DISPATCHERS: dict[str, type[BaseDispatcher]] = {
'systemd': SystemdDispatcher,
# could be used in docker containers and OSes w/o systemd
'container': ContainerDispatcher,
@@ -404,19 +405,21 @@ class MCSProcessManager:
return set(node_progs) == set(p['name'] for p in running_procs)
@classmethod
def start_node(cls, is_primary: bool, use_sudo: bool = True):
def start_node(cls, is_primary: bool, use_sudo: bool = True, is_read_only: bool = False) -> None:
"""Start mcs node processes.
:param is_primary: is node primary or not, defaults to True
:type is_primary: bool
:param use_sudo: use sudo or not, defaults to True
:type use_sudo: bool, optional
:param is_read_only: if true, doesn't start WriteEngine
:type is_read_only: bool, optional
:raises CMAPIBasicError: immediately if one mcs process not started
"""
for prog_name in cls._get_sorted_progs(is_primary):
if (
cls.dispatcher_name == 'systemd'
and prog_name == 'StorageManager'
and prog_name == MCSProgs.STORAGE_MANAGER.value
):
# TODO: MCOL-5458
logging.info(
@@ -424,17 +427,24 @@ class MCSProcessManager:
)
continue
# TODO: additional error handling
if prog_name == 'controllernode':
if prog_name == MCSProgs.CONTROLLER_NODE.value:
cls._wait_for_workernodes()
if prog_name in ('DMLProc', 'DDLProc'):
if prog_name in (MCSProgs.DML_PROC.value, MCSProgs.DDL_PROC.value):
cls._wait_for_controllernode()
if is_read_only and prog_name == MCSProgs.WRITE_ENGINE_SERVER.value:
logging.debug('Node is in read-only mode, not starting WriteEngine')
continue
if not cls.start(prog_name, is_primary, use_sudo):
logging.error(f'Process "{prog_name}" not started properly.')
raise CMAPIBasicError(f'Error while starting "{prog_name}".')
@classmethod
def stop_node(
cls, is_primary: bool, use_sudo: bool = True, timeout: int = 10
cls,
is_primary: bool,
use_sudo: bool = True,
timeout: int = 10,
is_read_only: bool = False,
):
"""Stop mcs node processes.
@@ -444,6 +454,8 @@ class MCSProcessManager:
:type use_sudo: bool, optional
:param timeout: timeout for DMLProc gracefully stop using DBRM, seconds
:type timeout: int
:param is_read_only: if true, doesn't stop WriteEngine
:type is_read_only: bool, optional
:raises CMAPIBasicError: immediately if one mcs process not stopped
"""
# Every time try to stop all processes no matter primary it or slave,
@@ -451,13 +463,16 @@ class MCSProcessManager:
# undefined behaviour when primary gone and then recovers (failover
# triggered 2 times).
for prog_name in cls._get_sorted_progs(True, reverse=True):
if is_read_only and prog_name == MCSProgs.WRITE_ENGINE_SERVER.value:
logging.debug('Node is in read-only mode, not stopping WriteEngine')
continue
if not cls.stop(prog_name, is_primary, use_sudo):
logging.error(f'Process "{prog_name}" not stopped properly.')
raise CMAPIBasicError(f'Error while stopping "{prog_name}"')
@classmethod
def restart_node(cls, is_primary: bool, use_sudo: bool):
def restart_node(cls, is_primary: bool, use_sudo: bool, is_read_only: bool = False):
"""TODO: For next releases."""
if cls.get_running_mcs_procs():
cls.stop_node(is_primary, use_sudo)
cls.start_node(is_primary, use_sudo)
cls.stop_node(is_primary, use_sudo, is_read_only)
cls.start_node(is_primary, use_sudo, is_read_only)

View File

@@ -61,7 +61,8 @@ def switch_node_maintenance(
def add_node(
node: str, input_config_filename: str = DEFAULT_MCS_CONF_PATH,
output_config_filename: Optional[str] = None,
use_rebalance_dbroots: bool = True
use_rebalance_dbroots: bool = True,
read_only: bool = False,
):
"""Add node to a cluster.
@@ -95,14 +96,23 @@ def add_node(
try:
if not _replace_localhost(c_root, node):
pm_num = _add_node_to_PMS(c_root, node)
_add_WES(c_root, pm_num, node)
if not read_only:
_add_WES(c_root, pm_num, node)
else:
logging.info("Node is read-only, skipping WES addition")
_add_read_only_node(c_root, node)
_add_DBRM_Worker(c_root, node)
_add_Module_entries(c_root, node)
_add_active_node(c_root, node)
_add_node_to_ExeMgrs(c_root, node)
if use_rebalance_dbroots:
_rebalance_dbroots(c_root)
_move_primary_node(c_root)
if not read_only:
_rebalance_dbroots(c_root)
_move_primary_node(c_root)
else:
logging.debug("Node is read-only, skipping dbroots rebalancing")
except Exception:
logging.error(
'Caught exception while adding node, config file is unchanged',
@@ -156,7 +166,11 @@ def remove_node(
if len(active_nodes) > 1:
pm_num = _remove_node_from_PMS(c_root, node)
_remove_WES(c_root, pm_num)
is_read_only = node in helpers.get_read_only_nodes(c_root)
if not is_read_only:
_remove_WES(c_root, pm_num)
_remove_DBRM_Worker(c_root, node)
_remove_Module_entries(c_root, node)
_remove_from_ExeMgrs(c_root, node)
@@ -167,7 +181,7 @@ def remove_node(
# TODO: unspecific name, need to think of a better one
_remove_node(c_root, node)
if use_rebalance_dbroots:
if use_rebalance_dbroots and not is_read_only:
_rebalance_dbroots(c_root)
_move_primary_node(c_root)
else:
@@ -375,12 +389,16 @@ def __remove_helper(parent_node, node):
def _remove_node(root, node):
'''
remove node from DesiredNodes, InactiveNodes, and ActiveNodes
remove node from DesiredNodes, InactiveNodes, ActiveNodes and (if present) ReadOnlyNodes
'''
for n in (root.find("./DesiredNodes"), root.find("./InactiveNodes"), root.find("./ActiveNodes")):
__remove_helper(n, node)
read_only_nodes = root.find("./ReadOnlyNodes")
if read_only_nodes is not None:
__remove_helper(read_only_nodes, node)
# This moves a node from ActiveNodes to InactiveNodes
def _deactivate_node(root, node):
@@ -988,6 +1006,19 @@ def _add_WES(root, pm_num, node):
etree.SubElement(wes_node, "Port").text = "8630"
def _add_read_only_node(root, node) -> None:
"""Add node name to ReadOnlyNodes if it's not already there"""
read_only_nodes = root.find("./ReadOnlyNodes")
if read_only_nodes is None:
read_only_nodes = etree.SubElement(root, "ReadOnlyNodes")
else:
for n in read_only_nodes.findall("./Node"):
if n.text == node:
return
etree.SubElement(read_only_nodes, "Node").text = node
def _add_DBRM_Worker(root, node):
'''
find the highest numbered DBRM_Worker entry, or one that isn't used atm
@@ -1090,7 +1121,7 @@ def _add_node_to_PMS(root, node):
return new_pm_num
def _replace_localhost(root, node):
def _replace_localhost(root: etree.Element, node: str) -> bool:
# if DBRM_Controller/IPAddr is 127.0.0.1 or localhost,
# then replace all instances, else do nothing.
controller_host = root.find('./DBRM_Controller/IPAddr')

View File

@@ -6,6 +6,7 @@ from shutil import copyfile
import requests
from cmapi_server.constants import MCSProgs
from cmapi_server.controllers.dispatcher import _version
from cmapi_server.managers.process import MCSProcessManager
from cmapi_server.test.unittest_global import (
@@ -199,9 +200,13 @@ class ClusterAddNodeTestCase(BaseClusterTestCase):
# Check Columntore started
controllernode = subprocess.check_output(
['pgrep', 'controllernode'])
['pgrep', MCSProgs.CONTROLLER_NODE.value])
self.assertIsNotNone(controllernode)
# Check that WriteEngineServer was started
wes = subprocess.check_output(['pgrep', MCSProgs.WRITE_ENGINE_SERVER.value])
self.assertIsNotNone(wes)
class ClusterRemoveNodeTestCase(BaseClusterTestCase):
URL = ClusterAddNodeTestCase.URL

View File

@@ -1,10 +1,12 @@
import logging
import socket
from unittest.mock import patch
from lxml import etree
from cmapi_server import node_manipulation
from cmapi_server.constants import MCS_DATA_PATH
from cmapi_server.helpers import get_read_only_nodes
from cmapi_server.test.unittest_global import (
tmp_mcs_config_filename, BaseNodeManipTestCase
)
@@ -13,6 +15,8 @@ from mcs_node_control.models.node_config import NodeConfig
logging.basicConfig(level='DEBUG')
SINGLE_NODE_XML = "./cmapi_server/SingleNode.xml"
class NodeManipTester(BaseNodeManipTestCase):
@@ -52,6 +56,61 @@ class NodeManipTester(BaseNodeManipTestCase):
# node = root.find('./PMS2/IPAddr')
# self.assertEqual(node, None)
def test_add_remove_read_only_node(self):
"""add_node(read_only=True) should add a read-only node into the config, it does not add a WriteEngineServer (WES) and does not own dbroots"""
self.tmp_files = ('./config_output_rw.xml', './config_output_ro.xml', './config_output_ro_removed.xml')
# Add this host as a read-write node
local_host_addr = socket.gethostbyname(socket.gethostname())
node_manipulation.add_node(
local_host_addr, SINGLE_NODE_XML, self.tmp_files[0]
)
# Mock _rebalance_dbroots and _move_primary_node (only after the first node is added)
with patch('cmapi_server.node_manipulation._rebalance_dbroots') as mock_rebalance_dbroots, \
patch('cmapi_server.node_manipulation._move_primary_node') as mock_move_primary_node:
# Add a read-only node
node_manipulation.add_node(
self.NEW_NODE_NAME, self.tmp_files[0], self.tmp_files[1],
read_only=True,
)
nc = NodeConfig()
root = nc.get_current_config_root(self.tmp_files[1])
# Check if read-only nodes section exists and is filled
read_only_nodes = get_read_only_nodes(root)
self.assertEqual(len(read_only_nodes), 1)
self.assertEqual(read_only_nodes[0], self.NEW_NODE_NAME)
# Check if PMS was added
pms_node_ipaddr = root.find('./PMS2/IPAddr')
self.assertEqual(pms_node_ipaddr.text, self.NEW_NODE_NAME)
# Check that WriteEngineServer was not added
wes_node = root.find('./pm2_WriteEngineServer')
self.assertIsNone(wes_node)
# Check that the dbroot related methods were not called
mock_rebalance_dbroots.assert_not_called()
mock_move_primary_node.assert_not_called()
# Test read-only node removal
node_manipulation.remove_node(
self.NEW_NODE_NAME, self.tmp_files[1], self.tmp_files[2],
)
nc = NodeConfig()
root = nc.get_current_config_root(self.tmp_files[2])
read_only_nodes = get_read_only_nodes(root)
self.assertEqual(len(read_only_nodes), 0)
# Check that dbroot related methods were not called
mock_rebalance_dbroots.assert_not_called()
mock_move_primary_node.assert_not_called()
def test_add_dbroots_nodes_rebalance(self):
self.tmp_files = (
'./extra-dbroots-0.xml', './extra-dbroots-1.xml',

View File

@@ -198,6 +198,14 @@ def add(
'node IP, name or FQDN. '
'Can be used multiple times to add several nodes at a time.'
)
),
read_only: bool = typer.Option(
False,
'--read-only',
help=(
'Add node (or nodes, if more than one is passed) in read-only '
'mode.'
)
)
):
"""Add nodes to the Columnstore cluster."""
@@ -207,7 +215,9 @@ def add(
extra_nodes=nodes
):
for node in nodes:
result.append(client.add_node({'node': node}))
result.append(
client.add_node({'node': node, 'read_only': read_only})
)
return result

View File

@@ -36,7 +36,7 @@ class NodeConfig:
"""
def get_current_config_root(
self, config_filename: str = DEFAULT_MCS_CONF_PATH, upgrade=True
):
) -> etree.Element:
"""Retrieves current configuration.
Read the config and returns Element.
@@ -49,7 +49,7 @@ class NodeConfig:
self.upgrade_config(tree=tree, upgrade=upgrade)
return tree.getroot()
def get_root_from_string(self, config_string: str):
def get_root_from_string(self, config_string: str) -> etree.Element:
root = etree.fromstring(config_string)
self.upgrade_config(root=root)
return root
@@ -566,4 +566,19 @@ has dbroot {subel.text}')
for i in range(1, mod_count+1):
for j in range(1, int(smc_node.find(f"./ModuleDBRootCount{i}-3").text) + 1):
dbroots.append(smc_node.find(f"./ModuleDBRootID{i}-{j}-3").text)
# TODO not sure about it
if dbroots and self.is_read_only(root):
module_logger.warning("Config contains dbroots %s for this read-only node, ignoring", dbroots)
return []
return dbroots
def is_read_only(self, root=None) -> bool:
"""Checks if this node is in read-only mode"""
from cmapi_server.helpers import get_read_only_nodes # Avoid circular import
root = root or self.get_current_config_root()
read_only_nodes = set(get_read_only_nodes(root))
my_names = set(self.get_network_addresses_and_names())
return bool(read_only_nodes.intersection(my_names))