You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-11-03 17:13:17 +03:00
- Add SharedStorageMonitor thread to periodically verify shared storage:
* Writes a temp file to the shared location and validates MD5 from all nodes.
* Skips nodes with unstable recent heartbeats; retries once; defers decision if any node is unreachable.
* Updates a cluster-wide stateful flag (shared_storage_on) only on conclusive checks.
- New CMAPI endpoints:
* PUT /cmapi/{ver}/cluster/check-shared-storage — orchestrates cross-node checks.
* GET /cmapi/{ver}/node/check-shared-file — validates a given file’s MD5 on a node.
* PUT /cmapi/{ver}/node/stateful-config — fast path to distribute stateful config updates.
- Introduce in-memory stateful config (AppStatefulConfig) with versioned flags (term/seq) and shared_storage_on flag:
* Broadcast via helpers.broadcast_stateful_config and enhanced broadcast_new_config.
* Config PUT is now validated with Pydantic models; supports stateful-only updates and set_mode requests.
- Failover behavior:
* NodeMonitor keeps failover inactive when shared_storage_on is false or cluster size < 3.
* Rebalancing DBRoots becomes a no-op when shared storage is OFF (safety guard).
- mcl status improvements: per-node 'state' (online/offline), better timeouts and error reporting.
- Routing/wiring: add dispatcher routes for new endpoints; add ClusterModeEnum.
- Tests: cover shared-storage monitor (unreachable nodes, HB-based skipping), node manipulation with shared storage ON/OFF, and server/config flows.
- Dependencies: add pydantic; minor cleanups and logging.
148 lines
5.5 KiB
Python
148 lines
5.5 KiB
Python
import datetime
|
|
import socket
|
|
import time
|
|
from contextlib import contextmanager
|
|
|
|
import cherrypy
|
|
|
|
from cmapi_server import helpers, node_manipulation
|
|
from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error
|
|
from cmapi_server.failover_agent import FailoverAgent
|
|
from cmapi_server.managers.certificate import CertificateManager
|
|
from cmapi_server.test.mock_resolution import MockResolutionBuilder
|
|
from cmapi_server.test.unittest_global import BaseNodeManipTestCase
|
|
|
|
from ..agent_comm import AgentComm
|
|
|
|
config_filename = './cmapi_server/cmapi_server.conf'
|
|
|
|
|
|
@contextmanager
|
|
def start_server():
|
|
# TODO: review and replace with run_server() from unittest_global.py
|
|
CertificateManager.create_self_signed_certificate_if_not_exist()
|
|
|
|
app = cherrypy.tree.mount(root = None, config = config_filename)
|
|
app.config.update({
|
|
'/': {
|
|
'request.dispatch': dispatcher,
|
|
'error_page.default': jsonify_error,
|
|
},
|
|
'config': {
|
|
'path': config_filename,
|
|
},
|
|
})
|
|
cherrypy.config.update(config_filename)
|
|
|
|
cherrypy.engine.start()
|
|
cherrypy.engine.wait(cherrypy.engine.states.STARTED)
|
|
yield
|
|
cherrypy.engine.exit()
|
|
cherrypy.engine.block()
|
|
|
|
|
|
class TestAgentComm(BaseNodeManipTestCase):
|
|
|
|
def test_with_agent_base(self):
|
|
agent = AgentComm()
|
|
test_ip = '104.17.191.14'
|
|
with (
|
|
MockResolutionBuilder()
|
|
.add_mapping('mysql.com', test_ip)
|
|
.add_mapping(socket.gethostname(), test_ip)
|
|
.build()
|
|
):
|
|
agent.activateNodes(["mysql.com"])
|
|
agent.activateNodes(["mysql.com"]) # an intentional dup
|
|
agent.designatePrimaryNode("mysql.com")
|
|
agent.deactivateNodes(["mysql.com"])
|
|
agent.deactivateNodes(["mysql.com"])
|
|
agent.designatePrimaryNode(socket.gethostname())
|
|
|
|
health = agent.getNodeHealth()
|
|
agent.raiseAlarm("Hello world!")
|
|
print("Waiting up to 20s for queued events to be processed and removed")
|
|
stop_time = datetime.datetime.now() + datetime.timedelta(seconds = 20)
|
|
success = False
|
|
while datetime.datetime.now() < stop_time and not success:
|
|
sizes = agent.getQueueSize()
|
|
if sizes != (0, 0):
|
|
time.sleep(1)
|
|
else:
|
|
print("Event queue & deduper are now empty")
|
|
success = True
|
|
|
|
print("Waiting for the agent comm thread to die.")
|
|
agent.die()
|
|
self.assertTrue(success)
|
|
|
|
|
|
# This is the beginnings of an integration test, will need perms to modify the real config file
|
|
def test_with_failover_agent(self):
|
|
# Toggle shared storage as requested and remember to restore it back.
|
|
original_shared_storage = self._set_shared_storage(True)
|
|
|
|
print('\n\n') # make a little whitespace between tests
|
|
|
|
# check for existence of and permissions to write to the real config file
|
|
try:
|
|
f = open('/etc/columnstore/Columnstore.xml', 'a')
|
|
f.close()
|
|
except PermissionError:
|
|
print(f'Skipping {__name__}, got a permissions error opening /etc/columnstore/Columnstore.xml for writing')
|
|
return
|
|
|
|
success = False
|
|
with start_server():
|
|
try:
|
|
agent = FailoverAgent()
|
|
agentcomm = AgentComm(agent)
|
|
|
|
# make sure the AC thread has a chance to start before we start issuing cmds.
|
|
# If it grabs jobs in the middle of this block, we'll try to send the config file
|
|
# to mysql.com. :D
|
|
time.sleep(1)
|
|
|
|
test_ip = '104.17.191.14'
|
|
with (
|
|
MockResolutionBuilder()
|
|
.add_mapping('mysql.com', test_ip)
|
|
.add_mapping(socket.gethostname(), test_ip)
|
|
.build()
|
|
):
|
|
agentcomm.activateNodes(["mysql.com"])
|
|
agentcomm.activateNodes(["mysql.com"]) # an intentional dup
|
|
agentcomm.designatePrimaryNode("mysql.com")
|
|
agentcomm.deactivateNodes(["mysql.com"])
|
|
agentcomm.deactivateNodes(["mysql.com"])
|
|
agentcomm.designatePrimaryNode(socket.gethostname())
|
|
|
|
health = agent.getNodeHealth()
|
|
agent.raiseAlarm('Hello world!')
|
|
print('Waiting up to 30s for queued events to be processed and removed')
|
|
stop_time = datetime.datetime.now() + datetime.timedelta(seconds = 30)
|
|
|
|
while datetime.datetime.now() < stop_time and not success:
|
|
sizes = agentcomm.getQueueSize()
|
|
if sizes != (0, 0):
|
|
time.sleep(1)
|
|
else:
|
|
print('Event queue & deduper are now empty')
|
|
success = True
|
|
if not success:
|
|
raise Exception('The event queue or de-duper did not empty within 30s')
|
|
agentcomm.die()
|
|
except Exception as e:
|
|
agentcomm.die()
|
|
cherrypy.engine.exit()
|
|
cherrypy.engine.block()
|
|
raise
|
|
|
|
# clean up the config file, remove mysql.com
|
|
txnid = helpers.start_transaction()
|
|
node_manipulation.remove_node('mysql.com')
|
|
helpers.update_revision_and_manager()
|
|
helpers.broadcast_new_config()
|
|
helpers.commit_transaction(txnid)
|
|
_ = self._set_shared_storage(original_shared_storage)
|