1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-11-02 06:13:16 +03:00
Files
mariadb-columnstore-engine/cmapi/cmapi_server/test/unittest_global.py
mariadb-AlanMologorsky c86586c228 feat(cmapi,failover): MCOL-6006 Disable failover when shared storage not detected
- Add SharedStorageMonitor thread to periodically verify shared storage:
  * Writes a temp file to the shared location and validates MD5 from all nodes.
  * Skips nodes with unstable recent heartbeats; retries once; defers decision if any node is unreachable.
  * Updates a cluster-wide stateful flag (shared_storage_on) only on conclusive checks.
- New CMAPI endpoints:
  * PUT /cmapi/{ver}/cluster/check-shared-storage — orchestrates cross-node checks.
  * GET /cmapi/{ver}/node/check-shared-file — validates a given file’s MD5 on a node.
  * PUT /cmapi/{ver}/node/stateful-config — fast path to distribute stateful config updates.
- Introduce in-memory stateful config (AppStatefulConfig) with versioned flags (term/seq) and shared_storage_on flag:
  * Broadcast via helpers.broadcast_stateful_config and enhanced broadcast_new_config.
  * Config PUT is now validated with Pydantic models; supports stateful-only updates and set_mode requests.
- Failover behavior:
  * NodeMonitor keeps failover inactive when shared_storage_on is false or cluster size < 3.
  * Rebalancing DBRoots becomes a no-op when shared storage is OFF (safety guard).
- mcl status improvements: per-node 'state' (online/offline), better timeouts and error reporting.
- Routing/wiring: add dispatcher routes for new endpoints; add ClusterModeEnum.
- Tests: cover shared-storage monitor (unreachable nodes, HB-based skipping), node manipulation with shared storage ON/OFF, and server/config flows.
- Dependencies: add pydantic; minor cleanups and logging.
2025-10-01 21:10:34 +04:00

167 lines
5.8 KiB
Python

import logging
import os
import unittest
from contextlib import contextmanager
from shutil import copyfile
from tempfile import TemporaryDirectory
import cherrypy
from cmapi_server import helpers
from cmapi_server.constants import CMAPI_CONF_PATH
from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error
from cmapi_server.logging_management import config_cmapi_server_logging
from cmapi_server.managers.application import (
AppStatefulConfig, StatefulConfigModel, StatefulFlagsModel
)
from cmapi_server.managers.process import MCSProcessManager
from cmapi_server.managers.certificate import CertificateManager
TEST_API_KEY = 'somekey123'
MCS_CONFIG_FILEPATH = '/etc/columnstore/Columnstore.xml'
# Use absolute paths relative to this test package so tests work regardless of CWD
_TEST_DIR = os.path.dirname(__file__)
COPY_MCS_CONFIG_FILEPATH = os.path.join(_TEST_DIR, 'original_Columnstore.xml')
TEST_MCS_CONFIG_FILEPATH = os.path.join(_TEST_DIR, 'CS-config-test.xml')
# TODO:
# - rename after fix in all places
# - fix path to abs
mcs_config_filename = os.path.join(_TEST_DIR, 'CS-config-test.xml')
tmp_mcs_config_filename = os.path.join(_TEST_DIR, 'tmp.xml')
cmapi_config_filename = os.path.join(os.path.dirname(_TEST_DIR), 'cmapi_server.conf')
tmp_cmapi_config_filename = os.path.join(_TEST_DIR, 'tmp.conf')
single_node_xml = os.path.join(_TEST_DIR, '..', 'SingleNode.xml')
# constants for process dispatchers
DDL_SERVICE = 'mcs-ddlproc'
CONTROLLERNODE_SERVICE = 'mcs-controllernode.service'
UNKNOWN_SERVICE = 'unknown_service'
SYSTEMCTL = 'systemctl'
logging.basicConfig(level=logging.DEBUG)
def run_detect_processes():
cfg_parser = helpers.get_config_parser(CMAPI_CONF_PATH)
d_name, d_path = helpers.get_dispatcher_name_and_path(cfg_parser)
MCSProcessManager.detect(d_name, d_path)
@contextmanager
def run_server():
CertificateManager.create_self_signed_certificate_if_not_exist()
cherrypy.engine.start()
cherrypy.engine.wait(cherrypy.engine.states.STARTED)
run_detect_processes() #TODO: Move cause slow down each test for 5s
yield
cherrypy.engine.exit()
cherrypy.engine.block()
class BaseServerTestCase(unittest.TestCase):
HEADERS = {'x-api-key': TEST_API_KEY}
NO_AUTH_HEADERS = {'x-api-key': None}
TEST_PARAMS = (
('auth ok', HEADERS, 200),
('no auth', NO_AUTH_HEADERS, 401)
)
def run(self, result=None):
with TemporaryDirectory() as tmp_dir:
self.tmp_dir = tmp_dir
self.cmapi_config_filename = os.path.join(
tmp_dir, 'tmp_cmapi_config.conf'
)
self.mcs_config_filename = os.path.join(
tmp_dir, 'tmp_mcs_config.xml'
)
copyfile(cmapi_config_filename, self.cmapi_config_filename)
copyfile(TEST_MCS_CONFIG_FILEPATH, self.mcs_config_filename)
config_cmapi_server_logging()
self.app = cherrypy.tree.mount(
root=None, config=self.cmapi_config_filename
)
self.app.config.update({
'/': {
'request.dispatch': dispatcher,
'error_page.default': jsonify_error,
},
'config': {
'path': self.cmapi_config_filename,
},
'Authentication' : self.HEADERS
})
cherrypy.config.update(self.cmapi_config_filename)
with run_server():
return super().run(result=result)
class BaseNodeManipTestCase(unittest.TestCase):
NEW_NODE_NAME = 'node.hostname'
def setUp(self):
self.tmp_files = []
copyfile(TEST_MCS_CONFIG_FILEPATH, tmp_mcs_config_filename)
def tearDown(self):
for tmp_file in self.tmp_files:
if os.path.exists(tmp_file):
os.remove(tmp_file)
if os.path.exists(tmp_mcs_config_filename):
os.remove(tmp_mcs_config_filename)
def _set_shared_storage(self, target_value: bool) -> bool:
"""Set shared_storage_on flag to target_value, return original value.
If the current value already equals target_value, no update is applied.
"""
current_cfg = AppStatefulConfig.get_config_copy()
original_value = current_cfg.flags.shared_storage_on
if original_value != target_value:
new_cfg = StatefulConfigModel(
version=current_cfg.version.next_seq(),
flags=StatefulFlagsModel(shared_storage_on=target_value),
)
AppStatefulConfig.apply_update(new_cfg)
return original_value
class BaseProcessDispatcherCase(unittest.TestCase):
node_started = None
@classmethod
def setUpClass(cls) -> None:
run_detect_processes()
cls.node_started = MCSProcessManager.get_running_mcs_procs() != 0
return super().setUpClass()
@classmethod
def tearDownClass(cls) -> None:
if (MCSProcessManager.get_running_mcs_procs() !=0) == cls.node_started:
return super().tearDownClass()
if cls.node_started:
MCSProcessManager.start_node(is_primary=True, use_sudo=False)
else:
MCSProcessManager.stop_node(is_primary=True, use_sudo=False)
return super().tearDownClass()
def setUp(self) -> None:
if MCSProcessManager.process_dispatcher.is_service_running(
CONTROLLERNODE_SERVICE
):
self.controller_node_cmd = 'start'
else:
self.controller_node_cmd = 'stop'
# prevent to get 'start-limit-hit' systemd error, see MCOL-5186
os.system(f'{SYSTEMCTL} reset-failed')
return super().setUp()
def tearDown(self) -> None:
os.system(
f'{SYSTEMCTL} {self.controller_node_cmd} {CONTROLLERNODE_SERVICE}'
)
return super().tearDown()