You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-11-02 06:13:16 +03:00
- Add SharedStorageMonitor thread to periodically verify shared storage:
* Writes a temp file to the shared location and validates MD5 from all nodes.
* Skips nodes with unstable recent heartbeats; retries once; defers decision if any node is unreachable.
* Updates a cluster-wide stateful flag (shared_storage_on) only on conclusive checks.
- New CMAPI endpoints:
* PUT /cmapi/{ver}/cluster/check-shared-storage — orchestrates cross-node checks.
* GET /cmapi/{ver}/node/check-shared-file — validates a given file’s MD5 on a node.
* PUT /cmapi/{ver}/node/stateful-config — fast path to distribute stateful config updates.
- Introduce in-memory stateful config (AppStatefulConfig) with versioned flags (term/seq) and shared_storage_on flag:
* Broadcast via helpers.broadcast_stateful_config and enhanced broadcast_new_config.
* Config PUT is now validated with Pydantic models; supports stateful-only updates and set_mode requests.
- Failover behavior:
* NodeMonitor keeps failover inactive when shared_storage_on is false or cluster size < 3.
* Rebalancing DBRoots becomes a no-op when shared storage is OFF (safety guard).
- mcl status improvements: per-node 'state' (online/offline), better timeouts and error reporting.
- Routing/wiring: add dispatcher routes for new endpoints; add ClusterModeEnum.
- Tests: cover shared-storage monitor (unreachable nodes, HB-based skipping), node manipulation with shared storage ON/OFF, and server/config flows.
- Dependencies: add pydantic; minor cleanups and logging.
167 lines
5.8 KiB
Python
167 lines
5.8 KiB
Python
import logging
|
|
import os
|
|
import unittest
|
|
from contextlib import contextmanager
|
|
from shutil import copyfile
|
|
from tempfile import TemporaryDirectory
|
|
|
|
import cherrypy
|
|
from cmapi_server import helpers
|
|
from cmapi_server.constants import CMAPI_CONF_PATH
|
|
from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error
|
|
from cmapi_server.logging_management import config_cmapi_server_logging
|
|
from cmapi_server.managers.application import (
|
|
AppStatefulConfig, StatefulConfigModel, StatefulFlagsModel
|
|
)
|
|
from cmapi_server.managers.process import MCSProcessManager
|
|
from cmapi_server.managers.certificate import CertificateManager
|
|
|
|
|
|
TEST_API_KEY = 'somekey123'
|
|
MCS_CONFIG_FILEPATH = '/etc/columnstore/Columnstore.xml'
|
|
# Use absolute paths relative to this test package so tests work regardless of CWD
|
|
_TEST_DIR = os.path.dirname(__file__)
|
|
COPY_MCS_CONFIG_FILEPATH = os.path.join(_TEST_DIR, 'original_Columnstore.xml')
|
|
TEST_MCS_CONFIG_FILEPATH = os.path.join(_TEST_DIR, 'CS-config-test.xml')
|
|
# TODO:
|
|
# - rename after fix in all places
|
|
# - fix path to abs
|
|
mcs_config_filename = os.path.join(_TEST_DIR, 'CS-config-test.xml')
|
|
tmp_mcs_config_filename = os.path.join(_TEST_DIR, 'tmp.xml')
|
|
cmapi_config_filename = os.path.join(os.path.dirname(_TEST_DIR), 'cmapi_server.conf')
|
|
tmp_cmapi_config_filename = os.path.join(_TEST_DIR, 'tmp.conf')
|
|
single_node_xml = os.path.join(_TEST_DIR, '..', 'SingleNode.xml')
|
|
# constants for process dispatchers
|
|
DDL_SERVICE = 'mcs-ddlproc'
|
|
CONTROLLERNODE_SERVICE = 'mcs-controllernode.service'
|
|
UNKNOWN_SERVICE = 'unknown_service'
|
|
SYSTEMCTL = 'systemctl'
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
|
|
def run_detect_processes():
|
|
cfg_parser = helpers.get_config_parser(CMAPI_CONF_PATH)
|
|
d_name, d_path = helpers.get_dispatcher_name_and_path(cfg_parser)
|
|
MCSProcessManager.detect(d_name, d_path)
|
|
|
|
|
|
@contextmanager
|
|
def run_server():
|
|
CertificateManager.create_self_signed_certificate_if_not_exist()
|
|
|
|
cherrypy.engine.start()
|
|
cherrypy.engine.wait(cherrypy.engine.states.STARTED)
|
|
run_detect_processes() #TODO: Move cause slow down each test for 5s
|
|
yield
|
|
|
|
cherrypy.engine.exit()
|
|
cherrypy.engine.block()
|
|
|
|
|
|
class BaseServerTestCase(unittest.TestCase):
|
|
HEADERS = {'x-api-key': TEST_API_KEY}
|
|
NO_AUTH_HEADERS = {'x-api-key': None}
|
|
TEST_PARAMS = (
|
|
('auth ok', HEADERS, 200),
|
|
('no auth', NO_AUTH_HEADERS, 401)
|
|
)
|
|
|
|
def run(self, result=None):
|
|
with TemporaryDirectory() as tmp_dir:
|
|
self.tmp_dir = tmp_dir
|
|
self.cmapi_config_filename = os.path.join(
|
|
tmp_dir, 'tmp_cmapi_config.conf'
|
|
)
|
|
self.mcs_config_filename = os.path.join(
|
|
tmp_dir, 'tmp_mcs_config.xml'
|
|
)
|
|
copyfile(cmapi_config_filename, self.cmapi_config_filename)
|
|
copyfile(TEST_MCS_CONFIG_FILEPATH, self.mcs_config_filename)
|
|
config_cmapi_server_logging()
|
|
self.app = cherrypy.tree.mount(
|
|
root=None, config=self.cmapi_config_filename
|
|
)
|
|
self.app.config.update({
|
|
'/': {
|
|
'request.dispatch': dispatcher,
|
|
'error_page.default': jsonify_error,
|
|
},
|
|
'config': {
|
|
'path': self.cmapi_config_filename,
|
|
},
|
|
'Authentication' : self.HEADERS
|
|
})
|
|
cherrypy.config.update(self.cmapi_config_filename)
|
|
|
|
with run_server():
|
|
return super().run(result=result)
|
|
|
|
|
|
class BaseNodeManipTestCase(unittest.TestCase):
|
|
NEW_NODE_NAME = 'node.hostname'
|
|
|
|
def setUp(self):
|
|
self.tmp_files = []
|
|
copyfile(TEST_MCS_CONFIG_FILEPATH, tmp_mcs_config_filename)
|
|
|
|
def tearDown(self):
|
|
for tmp_file in self.tmp_files:
|
|
if os.path.exists(tmp_file):
|
|
os.remove(tmp_file)
|
|
if os.path.exists(tmp_mcs_config_filename):
|
|
os.remove(tmp_mcs_config_filename)
|
|
|
|
def _set_shared_storage(self, target_value: bool) -> bool:
|
|
"""Set shared_storage_on flag to target_value, return original value.
|
|
|
|
If the current value already equals target_value, no update is applied.
|
|
"""
|
|
current_cfg = AppStatefulConfig.get_config_copy()
|
|
original_value = current_cfg.flags.shared_storage_on
|
|
if original_value != target_value:
|
|
new_cfg = StatefulConfigModel(
|
|
version=current_cfg.version.next_seq(),
|
|
flags=StatefulFlagsModel(shared_storage_on=target_value),
|
|
)
|
|
AppStatefulConfig.apply_update(new_cfg)
|
|
return original_value
|
|
|
|
|
|
class BaseProcessDispatcherCase(unittest.TestCase):
|
|
node_started = None
|
|
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
run_detect_processes()
|
|
cls.node_started = MCSProcessManager.get_running_mcs_procs() != 0
|
|
return super().setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls) -> None:
|
|
if (MCSProcessManager.get_running_mcs_procs() !=0) == cls.node_started:
|
|
return super().tearDownClass()
|
|
if cls.node_started:
|
|
MCSProcessManager.start_node(is_primary=True, use_sudo=False)
|
|
else:
|
|
MCSProcessManager.stop_node(is_primary=True, use_sudo=False)
|
|
return super().tearDownClass()
|
|
|
|
def setUp(self) -> None:
|
|
if MCSProcessManager.process_dispatcher.is_service_running(
|
|
CONTROLLERNODE_SERVICE
|
|
):
|
|
self.controller_node_cmd = 'start'
|
|
else:
|
|
self.controller_node_cmd = 'stop'
|
|
# prevent to get 'start-limit-hit' systemd error, see MCOL-5186
|
|
os.system(f'{SYSTEMCTL} reset-failed')
|
|
return super().setUp()
|
|
|
|
def tearDown(self) -> None:
|
|
os.system(
|
|
f'{SYSTEMCTL} {self.controller_node_cmd} {CONTROLLERNODE_SERVICE}'
|
|
)
|
|
return super().tearDown()
|