1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-11-03 17:13:17 +03:00
Files
mariadb-columnstore-engine/cmapi/failover/test/test_agent_comm.py
mariadb-AlanMologorsky c86586c228 feat(cmapi,failover): MCOL-6006 Disable failover when shared storage not detected
- Add SharedStorageMonitor thread to periodically verify shared storage:
  * Writes a temp file to the shared location and validates MD5 from all nodes.
  * Skips nodes with unstable recent heartbeats; retries once; defers decision if any node is unreachable.
  * Updates a cluster-wide stateful flag (shared_storage_on) only on conclusive checks.
- New CMAPI endpoints:
  * PUT /cmapi/{ver}/cluster/check-shared-storage — orchestrates cross-node checks.
  * GET /cmapi/{ver}/node/check-shared-file — validates a given file’s MD5 on a node.
  * PUT /cmapi/{ver}/node/stateful-config — fast path to distribute stateful config updates.
- Introduce in-memory stateful config (AppStatefulConfig) with versioned flags (term/seq) and shared_storage_on flag:
  * Broadcast via helpers.broadcast_stateful_config and enhanced broadcast_new_config.
  * Config PUT is now validated with Pydantic models; supports stateful-only updates and set_mode requests.
- Failover behavior:
  * NodeMonitor keeps failover inactive when shared_storage_on is false or cluster size < 3.
  * Rebalancing DBRoots becomes a no-op when shared storage is OFF (safety guard).
- mcl status improvements: per-node 'state' (online/offline), better timeouts and error reporting.
- Routing/wiring: add dispatcher routes for new endpoints; add ClusterModeEnum.
- Tests: cover shared-storage monitor (unreachable nodes, HB-based skipping), node manipulation with shared storage ON/OFF, and server/config flows.
- Dependencies: add pydantic; minor cleanups and logging.
2025-10-01 21:10:34 +04:00

148 lines
5.5 KiB
Python

import datetime
import socket
import time
from contextlib import contextmanager
import cherrypy
from cmapi_server import helpers, node_manipulation
from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error
from cmapi_server.failover_agent import FailoverAgent
from cmapi_server.managers.certificate import CertificateManager
from cmapi_server.test.mock_resolution import MockResolutionBuilder
from cmapi_server.test.unittest_global import BaseNodeManipTestCase
from ..agent_comm import AgentComm
config_filename = './cmapi_server/cmapi_server.conf'
@contextmanager
def start_server():
# TODO: review and replace with run_server() from unittest_global.py
CertificateManager.create_self_signed_certificate_if_not_exist()
app = cherrypy.tree.mount(root = None, config = config_filename)
app.config.update({
'/': {
'request.dispatch': dispatcher,
'error_page.default': jsonify_error,
},
'config': {
'path': config_filename,
},
})
cherrypy.config.update(config_filename)
cherrypy.engine.start()
cherrypy.engine.wait(cherrypy.engine.states.STARTED)
yield
cherrypy.engine.exit()
cherrypy.engine.block()
class TestAgentComm(BaseNodeManipTestCase):
def test_with_agent_base(self):
agent = AgentComm()
test_ip = '104.17.191.14'
with (
MockResolutionBuilder()
.add_mapping('mysql.com', test_ip)
.add_mapping(socket.gethostname(), test_ip)
.build()
):
agent.activateNodes(["mysql.com"])
agent.activateNodes(["mysql.com"]) # an intentional dup
agent.designatePrimaryNode("mysql.com")
agent.deactivateNodes(["mysql.com"])
agent.deactivateNodes(["mysql.com"])
agent.designatePrimaryNode(socket.gethostname())
health = agent.getNodeHealth()
agent.raiseAlarm("Hello world!")
print("Waiting up to 20s for queued events to be processed and removed")
stop_time = datetime.datetime.now() + datetime.timedelta(seconds = 20)
success = False
while datetime.datetime.now() < stop_time and not success:
sizes = agent.getQueueSize()
if sizes != (0, 0):
time.sleep(1)
else:
print("Event queue & deduper are now empty")
success = True
print("Waiting for the agent comm thread to die.")
agent.die()
self.assertTrue(success)
# This is the beginnings of an integration test, will need perms to modify the real config file
def test_with_failover_agent(self):
# Toggle shared storage as requested and remember to restore it back.
original_shared_storage = self._set_shared_storage(True)
print('\n\n') # make a little whitespace between tests
# check for existence of and permissions to write to the real config file
try:
f = open('/etc/columnstore/Columnstore.xml', 'a')
f.close()
except PermissionError:
print(f'Skipping {__name__}, got a permissions error opening /etc/columnstore/Columnstore.xml for writing')
return
success = False
with start_server():
try:
agent = FailoverAgent()
agentcomm = AgentComm(agent)
# make sure the AC thread has a chance to start before we start issuing cmds.
# If it grabs jobs in the middle of this block, we'll try to send the config file
# to mysql.com. :D
time.sleep(1)
test_ip = '104.17.191.14'
with (
MockResolutionBuilder()
.add_mapping('mysql.com', test_ip)
.add_mapping(socket.gethostname(), test_ip)
.build()
):
agentcomm.activateNodes(["mysql.com"])
agentcomm.activateNodes(["mysql.com"]) # an intentional dup
agentcomm.designatePrimaryNode("mysql.com")
agentcomm.deactivateNodes(["mysql.com"])
agentcomm.deactivateNodes(["mysql.com"])
agentcomm.designatePrimaryNode(socket.gethostname())
health = agent.getNodeHealth()
agent.raiseAlarm('Hello world!')
print('Waiting up to 30s for queued events to be processed and removed')
stop_time = datetime.datetime.now() + datetime.timedelta(seconds = 30)
while datetime.datetime.now() < stop_time and not success:
sizes = agentcomm.getQueueSize()
if sizes != (0, 0):
time.sleep(1)
else:
print('Event queue & deduper are now empty')
success = True
if not success:
raise Exception('The event queue or de-duper did not empty within 30s')
agentcomm.die()
except Exception as e:
agentcomm.die()
cherrypy.engine.exit()
cherrypy.engine.block()
raise
# clean up the config file, remove mysql.com
txnid = helpers.start_transaction()
node_manipulation.remove_node('mysql.com')
helpers.update_revision_and_manager()
helpers.broadcast_new_config()
helpers.commit_transaction(txnid)
_ = self._set_shared_storage(original_shared_storage)