1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

On every node change adjust dbroots in the read-only nodes

This commit is contained in:
Alexander Presnyakov
2025-04-25 04:26:13 +00:00
parent ff25d4e35c
commit 5359c9822f
7 changed files with 150 additions and 49 deletions

View File

@@ -95,9 +95,7 @@ class FailoverAgent(AgentBase):
try: try:
# TODO: remove test_mode condition and add mock for testing # TODO: remove test_mode condition and add mock for testing
if not test_mode: if not test_mode:
MCSProcessManager.stop_node( MCSProcessManager.stop_node(is_primary=nc.is_primary_node())
is_primary=nc.is_primary_node(),
)
logger.info( logger.info(
'FA.enterStandbyMode(): successfully stopped node.' 'FA.enterStandbyMode(): successfully stopped node.'
) )

View File

@@ -15,7 +15,7 @@ from cmapi_server.helpers import (
get_current_key, get_version, update_revision_and_manager, get_current_key, get_version, update_revision_and_manager,
) )
from cmapi_server.node_manipulation import ( from cmapi_server.node_manipulation import (
add_node, add_dbroot, remove_node, switch_node_maintenance, add_node, add_dbroot, remove_node, switch_node_maintenance, update_dbroots_of_readonly_nodes,
) )
from mcs_node_control.models.misc import get_dbrm_master from mcs_node_control.models.misc import get_dbrm_master
from mcs_node_control.models.node_config import NodeConfig from mcs_node_control.models.node_config import NodeConfig
@@ -181,11 +181,6 @@ class ClusterHandler():
host=node, input_config_filename=config, host=node, input_config_filename=config,
output_config_filename=config output_config_filename=config
) )
else:
logger.debug(
f'Node {node} is read-only, skipping dbroot addition'
)
except Exception as err: except Exception as err:
raise CMAPIBasicError('Error while adding node.') from err raise CMAPIBasicError('Error while adding node.') from err
@@ -228,6 +223,8 @@ class ClusterHandler():
node, input_config_filename=config, node, input_config_filename=config,
output_config_filename=config output_config_filename=config
) )
with NodeConfig().modify_config(config) as root:
update_dbroots_of_readonly_nodes(root)
except Exception as err: except Exception as err:
raise CMAPIBasicError('Error while removing node.') from err raise CMAPIBasicError('Error while removing node.') from err

View File

@@ -378,7 +378,7 @@ def broadcast_new_config(
) as response: ) as response:
resp_json = await response.json(encoding='utf-8') resp_json = await response.json(encoding='utf-8')
response.raise_for_status() response.raise_for_status()
logging.info(f'Node {node} config put successfull.') logging.info(f'Node {node} config put successful.')
except aiohttp.ClientResponseError as err: except aiohttp.ClientResponseError as err:
# TODO: may be better to check if resp status is 422 cause # TODO: may be better to check if resp status is 422 cause
# it's like a signal that cmapi server raised it in # it's like a signal that cmapi server raised it in

View File

@@ -106,7 +106,7 @@ class TransactionManager(ContextDecorator):
try: try:
rollback_transaction(self.txn_id, nodes=nodes) rollback_transaction(self.txn_id, nodes=nodes)
self.active_transaction = False self.active_transaction = False
logging.debug(f'Successfull rollback of transaction "{self.txn_id}".') logging.debug(f'Successful rollback of transaction "{self.txn_id}".')
except Exception: except Exception:
logging.error( logging.error(
f'Error while rolling back transaction "{self.txn_id}"', f'Error while rolling back transaction "{self.txn_id}"',

View File

@@ -113,8 +113,8 @@ def add_node(
if not read_only: if not read_only:
_rebalance_dbroots(c_root) _rebalance_dbroots(c_root)
_move_primary_node(c_root) _move_primary_node(c_root)
else:
add_dbroots_of_other_nodes(c_root, pm_num) update_dbroots_of_readonly_nodes(c_root)
except Exception: except Exception:
logging.error( logging.error(
'Caught exception while adding node, config file is unchanged', 'Caught exception while adding node, config file is unchanged',
@@ -187,8 +187,7 @@ def remove_node(
_rebalance_dbroots(c_root) _rebalance_dbroots(c_root)
_move_primary_node(c_root) _move_primary_node(c_root)
if is_read_only: update_dbroots_of_readonly_nodes(c_root)
remove_dbroots_of_node(c_root, pm_num)
else: else:
# TODO: # TODO:
# - IMO undefined behaviour here. Removing one single node # - IMO undefined behaviour here. Removing one single node
@@ -262,7 +261,7 @@ def rebalance_dbroots(
# #
# returns the id of the new dbroot on success # returns the id of the new dbroot on success
# raises an exception on error # raises an exception on error
def add_dbroot(input_config_filename = None, output_config_filename = None, host = None): def add_dbroot(input_config_filename = None, output_config_filename = None, host = None) -> int:
node_config = NodeConfig() node_config = NodeConfig()
if input_config_filename is None: if input_config_filename is None:
c_root = node_config.get_current_config_root() c_root = node_config.get_current_config_root()
@@ -1185,26 +1184,54 @@ class NodeNotFoundException(Exception):
pass pass
def get_pm_module_num_to_addr_map(root: etree.Element) -> dict[int, str]:
"""Get a mapping of PM module numbers to their IP addresses"""
module_num_to_addr = {}
smc_node = root.find("./SystemModuleConfig")
mod_count = int(smc_node.find("./ModuleCount3").text)
for i in range(1, mod_count + 1):
ip_addr = smc_node.find(f"./ModuleIPAddr{i}-1-3").text
module_num_to_addr[i] = ip_addr
return module_num_to_addr
def update_dbroots_of_readonly_nodes(root: etree.Element) -> None:
"""Read-only nodes do not have their own dbroots, but they must have all the dbroots of the other nodes
So this function sets list of dbroots of each read-only node to the list of all the dbroots in the cluster
"""
nc = NodeConfig()
pm_num_to_addr = get_pm_module_num_to_addr_map(root)
for ro_node in nc.get_read_only_nodes(root):
# Get PM num by IP address
this_ip_pm_num = None
for pm_num, pm_addr in pm_num_to_addr.items():
if pm_addr == ro_node:
this_ip_pm_num = pm_num
break
if this_ip_pm_num is not None:
# Add dbroots of other nodes to this read-only node
add_dbroots_of_other_nodes(root, this_ip_pm_num)
else: # This should not happen
err_msg = f"Could not find PM number for read-only node {ro_node}"
logging.error(err_msg)
raise NodeNotFoundException(err_msg)
def add_dbroots_of_other_nodes(root: etree.Element, module_num: int) -> None: def add_dbroots_of_other_nodes(root: etree.Element, module_num: int) -> None:
"""Adds all the dbroots listed in the config to this (read-only) node""" """Adds all the dbroots listed in the config to this (read-only) node"""
existing_dbroots = _get_existing_db_roots(root) existing_dbroots = _get_existing_db_roots(root)
sysconf_node = root.find("./SystemModuleConfig") sysconf_node = root.find("./SystemModuleConfig")
# Remove existing dbroots from this module
remove_dbroots_of_node(root, module_num)
# Write node's dbroot count # Write node's dbroot count
dbroot_count_node = sysconf_node.find(f"./ModuleDBRootCount{module_num}-3")
if dbroot_count_node is not None:
sysconf_node.remove(dbroot_count_node)
dbroot_count_node = etree.SubElement( dbroot_count_node = etree.SubElement(
sysconf_node, f"ModuleDBRootCount{module_num}-3" sysconf_node, f"ModuleDBRootCount{module_num}-3"
) )
dbroot_count_node.text = str(len(existing_dbroots)) dbroot_count_node.text = str(len(existing_dbroots))
# Remove existing dbroot IDs of this module if present
for i in range(1, 100):
dbroot_id_node = sysconf_node.find(f"./ModuleDBRootID{module_num}-{i}-3")
if dbroot_id_node is not None:
sysconf_node.remove(dbroot_id_node)
# Write new dbroot IDs to the module mapping # Write new dbroot IDs to the module mapping
for i, dbroot_id in enumerate(existing_dbroots, start=1): for i, dbroot_id in enumerate(existing_dbroots, start=1):
dbroot_id_node = etree.SubElement( dbroot_id_node = etree.SubElement(
@@ -1221,10 +1248,6 @@ def remove_dbroots_of_node(root: etree.Element, module_num: int) -> None:
dbroot_count_node = sysconf_node.find(f"./ModuleDBRootCount{module_num}-3") dbroot_count_node = sysconf_node.find(f"./ModuleDBRootCount{module_num}-3")
if dbroot_count_node is not None: if dbroot_count_node is not None:
sysconf_node.remove(dbroot_count_node) sysconf_node.remove(dbroot_count_node)
else:
logging.error(
f"ModuleDBRootCount{module_num}-3 not found in SystemModuleConfig"
)
# Remove existing dbroot IDs # Remove existing dbroot IDs
for i in range(1, 100): for i in range(1, 100):

View File

@@ -1,13 +1,13 @@
import logging import logging
import socket import socket
import unittest import unittest
from unittest.mock import ANY, patch from unittest.mock import patch
from lxml import etree from lxml import etree
from cmapi_server import node_manipulation from cmapi_server import node_manipulation
from cmapi_server.constants import MCS_DATA_PATH from cmapi_server.constants import MCS_DATA_PATH
from cmapi_server.node_manipulation import add_dbroots_of_other_nodes, remove_dbroots_of_node from cmapi_server.node_manipulation import add_dbroots_of_other_nodes, remove_dbroots_of_node, update_dbroots_of_readonly_nodes
from cmapi_server.test.unittest_global import BaseNodeManipTestCase, tmp_mcs_config_filename from cmapi_server.test.unittest_global import BaseNodeManipTestCase, tmp_mcs_config_filename
from mcs_node_control.models.node_config import NodeConfig from mcs_node_control.models.node_config import NodeConfig
@@ -23,12 +23,18 @@ class NodeManipTester(BaseNodeManipTestCase):
'./test-output0.xml','./test-output1.xml','./test-output2.xml' './test-output0.xml','./test-output1.xml','./test-output2.xml'
) )
hostaddr = socket.gethostbyname(socket.gethostname()) hostaddr = socket.gethostbyname(socket.gethostname())
node_manipulation.add_node(
self.NEW_NODE_NAME, tmp_mcs_config_filename, self.tmp_files[0] with patch('cmapi_server.node_manipulation.update_dbroots_of_readonly_nodes') as mock_update_dbroots_of_readonly_nodes:
) node_manipulation.add_node(
node_manipulation.add_node( self.NEW_NODE_NAME, tmp_mcs_config_filename, self.tmp_files[0]
hostaddr, self.tmp_files[0], self.tmp_files[1] )
) mock_update_dbroots_of_readonly_nodes.assert_called_once()
mock_update_dbroots_of_readonly_nodes.reset_mock()
node_manipulation.add_node(
hostaddr, self.tmp_files[0], self.tmp_files[1]
)
mock_update_dbroots_of_readonly_nodes.assert_called_once()
# get a NodeConfig, read test.xml # get a NodeConfig, read test.xml
# look for some of the expected changes. # look for some of the expected changes.
@@ -42,10 +48,13 @@ class NodeManipTester(BaseNodeManipTestCase):
node = root.find("./ExeMgr2/IPAddr") node = root.find("./ExeMgr2/IPAddr")
self.assertEqual(node.text, hostaddr) self.assertEqual(node.text, hostaddr)
node_manipulation.remove_node( with patch('cmapi_server.node_manipulation.update_dbroots_of_readonly_nodes') as mock_update_dbroots_of_readonly_nodes:
self.NEW_NODE_NAME, self.tmp_files[1], self.tmp_files[2], node_manipulation.remove_node(
test_mode=True self.NEW_NODE_NAME, self.tmp_files[1], self.tmp_files[2],
) test_mode=True
)
mock_update_dbroots_of_readonly_nodes.assert_called_once()
nc = NodeConfig() nc = NodeConfig()
root = nc.get_current_config_root(self.tmp_files[2]) root = nc.get_current_config_root(self.tmp_files[2])
node = root.find('./PMS1/IPAddr') node = root.find('./PMS1/IPAddr')
@@ -67,8 +76,7 @@ class NodeManipTester(BaseNodeManipTestCase):
# Mock _rebalance_dbroots and _move_primary_node (only after the first node is added) # Mock _rebalance_dbroots and _move_primary_node (only after the first node is added)
with patch('cmapi_server.node_manipulation._rebalance_dbroots') as mock_rebalance_dbroots, \ with patch('cmapi_server.node_manipulation._rebalance_dbroots') as mock_rebalance_dbroots, \
patch('cmapi_server.node_manipulation._move_primary_node') as mock_move_primary_node, \ patch('cmapi_server.node_manipulation._move_primary_node') as mock_move_primary_node, \
patch('cmapi_server.node_manipulation.add_dbroots_of_other_nodes') as mock_add_dbroots_of_other_nodes, \ patch('cmapi_server.node_manipulation.update_dbroots_of_readonly_nodes') as mock_update_dbroots_of_readonly_nodes:
patch('cmapi_server.node_manipulation.remove_dbroots_of_node') as mock_remove_dbroots_of_node:
# Add a read-only node # Add a read-only node
node_manipulation.add_node( node_manipulation.add_node(
@@ -94,7 +102,8 @@ class NodeManipTester(BaseNodeManipTestCase):
mock_rebalance_dbroots.assert_not_called() mock_rebalance_dbroots.assert_not_called()
mock_move_primary_node.assert_not_called() mock_move_primary_node.assert_not_called()
mock_add_dbroots_of_other_nodes.assert_called_once_with(ANY, 2) mock_update_dbroots_of_readonly_nodes.assert_called_once()
mock_update_dbroots_of_readonly_nodes.reset_mock()
# Test read-only node removal # Test read-only node removal
node_manipulation.remove_node( node_manipulation.remove_node(
@@ -109,7 +118,7 @@ class NodeManipTester(BaseNodeManipTestCase):
mock_rebalance_dbroots.assert_not_called() mock_rebalance_dbroots.assert_not_called()
mock_move_primary_node.assert_not_called() mock_move_primary_node.assert_not_called()
mock_remove_dbroots_of_node.assert_called_once_with(ANY, 2) mock_update_dbroots_of_readonly_nodes.assert_called_once()
def test_add_dbroots_nodes_rebalance(self): def test_add_dbroots_nodes_rebalance(self):
@@ -271,13 +280,23 @@ class NodeManipTester(BaseNodeManipTestCase):
self.assertTrue(caught_it) self.assertTrue(caught_it)
class TestReadOnlyNodeDBRootsManip(unittest.TestCase): class TestDBRootsManipulation(unittest.TestCase):
our_module_idx = 2 our_module_idx = 3
ro_node1_ip = '192.168.1.3'
ro_node2_ip = '192.168.1.4'
def setUp(self): def setUp(self):
# Mock initial XML structure (add two dbroots) # Mock initial XML structure (add two nodes and two dbroots)
self.root = etree.Element('Columnstore') self.root = etree.Element('Columnstore')
etree.SubElement(self.root, 'SystemModuleConfig') # Add two PM modules with IP addresses
smc = etree.SubElement(self.root, 'SystemModuleConfig')
module_count = etree.SubElement(smc, 'ModuleCount3')
module_count.text = '2'
module1_ip = etree.SubElement(smc, 'ModuleIPAddr1-1-3')
module1_ip.text = '192.168.1.1'
module2_ip = etree.SubElement(smc, 'ModuleIPAddr2-1-3')
module2_ip.text = '192.168.1.2'
system_config = etree.SubElement(self.root, 'SystemConfig') system_config = etree.SubElement(self.root, 'SystemConfig')
dbroot_count = etree.SubElement(system_config, 'DBRootCount') dbroot_count = etree.SubElement(system_config, 'DBRootCount')
dbroot_count.text = '2' dbroot_count.text = '2'
@@ -286,6 +305,15 @@ class TestReadOnlyNodeDBRootsManip(unittest.TestCase):
dbroot2 = etree.SubElement(system_config, 'DBRoot2') dbroot2 = etree.SubElement(system_config, 'DBRoot2')
dbroot2.text = '/data/dbroot2' dbroot2.text = '/data/dbroot2'
def test_get_pm_module_num_to_addr_map(self):
result = node_manipulation.get_pm_module_num_to_addr_map(self.root)
expected = {
1: '192.168.1.1',
2: '192.168.1.2',
}
self.assertEqual(result, expected)
def test_add_dbroots_of_other_nodes(self): def test_add_dbroots_of_other_nodes(self):
'''add_dbroots_of_other_nodes must add dbroots of other nodes into mapping of the node.''' '''add_dbroots_of_other_nodes must add dbroots of other nodes into mapping of the node.'''
add_dbroots_of_other_nodes(self.root, self.our_module_idx) add_dbroots_of_other_nodes(self.root, self.our_module_idx)
@@ -325,3 +353,36 @@ class TestReadOnlyNodeDBRootsManip(unittest.TestCase):
dbroot2 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{self.our_module_idx}-2-3') dbroot2 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{self.our_module_idx}-2-3')
self.assertIsNone(dbroot1) self.assertIsNone(dbroot1)
self.assertIsNone(dbroot2) self.assertIsNone(dbroot2)
def test_update_dbroots_of_readonly_nodes(self):
"""Test that update_dbroots_of_readonly_nodes adds all existing dbroots to all existing read-only nodes"""
# Add two new new modules to the XML structure (two already exist)
smc = self.root.find('./SystemModuleConfig')
module_count = smc.find('./ModuleCount3')
module_count.text = '4'
module3_ip = etree.SubElement(smc, 'ModuleIPAddr3-1-3')
module3_ip.text = self.ro_node1_ip
module4_ip = etree.SubElement(smc, 'ModuleIPAddr4-1-3')
module4_ip.text = self.ro_node2_ip
# Add them to ReadOnlyNodes
read_only_nodes = etree.SubElement(self.root, 'ReadOnlyNodes')
for ip in [self.ro_node1_ip, self.ro_node2_ip]:
node = etree.SubElement(read_only_nodes, 'Node')
node.text = ip
update_dbroots_of_readonly_nodes(self.root)
# Check that read only nodes have all the dbroots
for ro_module_idx in range(3, 5):
module_count = self.root.find(f'./SystemModuleConfig/ModuleDBRootCount{ro_module_idx}-3')
self.assertIsNotNone(module_count)
self.assertEqual(module_count.text, '2')
dbroot1 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{ro_module_idx}-1-3')
dbroot2 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{ro_module_idx}-2-3')
self.assertIsNotNone(dbroot1)
self.assertIsNotNone(dbroot2)
self.assertEqual(dbroot1.text, '1')
self.assertEqual(dbroot2.text, '2')

View File

@@ -1,4 +1,5 @@
import configparser import configparser
from contextlib import contextmanager
import grp import grp
import logging import logging
import pwd import pwd
@@ -7,6 +8,7 @@ import socket
from os import chown, mkdir, replace from os import chown, mkdir, replace
from pathlib import Path from pathlib import Path
from shutil import copyfile from shutil import copyfile
from typing import Optional
from xml.dom import minidom # to pick up pretty printing functionality from xml.dom import minidom # to pick up pretty printing functionality
from lxml import etree from lxml import etree
@@ -136,6 +138,26 @@ class NodeConfig:
f.write(self.to_string(tree)) f.write(self.to_string(tree))
replace(tmp_filename, filename) # atomic replacement replace(tmp_filename, filename) # atomic replacement
@contextmanager
def modify_config(
self,
input_config_filename: str = DEFAULT_MCS_CONF_PATH,
output_config_filename: Optional[str] = None,
):
"""Context manager to modify the config file
If exception is raised, the config file is not modified and exception is re-raised
If output_config_filename is not provided, the input config file is modified
"""
try:
c_root = self.get_current_config_root(input_config_filename)
yield c_root
except Exception as e:
logging.error(f"modify_config(): Caught exception: '{str(e)}', config file not modified")
raise
else:
output_config_filename = output_config_filename or input_config_filename
self.write_config(c_root, output_config_filename)
def to_string(self, tree): def to_string(self, tree):
# TODO: try to use lxml to do this to avoid the add'l dependency # TODO: try to use lxml to do this to avoid the add'l dependency
xmlstr = minidom.parseString(etree.tostring(tree)).toprettyxml( xmlstr = minidom.parseString(etree.tostring(tree)).toprettyxml(