1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

feat(cmapi): MCOL-6006 Disable failover when shared storage not detected

add Base api client
add NodeController api client
fix ClusterController Api client
add NodeController in endpoints.py
add 2 new endpoints to dispatcher: cluster/check-shared-storage and node/check-shared-file
add check_shared_storage method into ClusterHandler class
add set_shared_storage to node_manipulation t
add reading shared_storage in failover/config.py
fix is_shared_storage method in failover/config.py
add check_shared_storage in NodeMonitor class
fix some minor styling
This commit is contained in:
mariadb-AlanMologorsky
2025-07-18 16:00:34 +03:00
parent 7dca1da8f2
commit 48148b1e6e
10 changed files with 361 additions and 80 deletions

View File

@@ -22,9 +22,11 @@ from cmapi_server import helpers
from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, CMAPI_CONF_PATH
from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error
from cmapi_server.failover_agent import FailoverAgent
from cmapi_server.handlers.cluster import ClusterHandler
from cmapi_server.managers.application import AppManager
from cmapi_server.managers.process import MCSProcessManager
from cmapi_server.managers.certificate import CertificateManager
from cmapi_server.node_manipulation import set_shared_storage
from failover.node_monitor import NodeMonitor
from mcs_node_control.models.dbrm_socket import SOCK_TIMEOUT, DBRMSocketHandler
from mcs_node_control.models.node_config import NodeConfig

View File

@@ -4,7 +4,6 @@ from typing import Any, Dict, Optional, Union
import pyotp
import requests
from cmapi_server.controllers.dispatcher import _version
from cmapi_server.constants import (
CMAPI_CONF_PATH, CURRENT_NODE_CMAPI_URL, SECRET_KEY,
)
@@ -12,25 +11,106 @@ from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.helpers import get_config_parser, get_current_key
class ClusterControllerClient:
_version = '0.4.0'
class BaseClient:
"""Base class for API clients.
This class is not intended to be used directly, but rather as a
base class for other API clients. It provides a common interface
for making requests to the API and handling responses.
WARNING: This class only handles the API requests, it does not
handle the transaction management. So it should be started
at level above using TransactionManager (decorator or context
manager).
"""
def __init__(
self, base_url: str = CURRENT_NODE_CMAPI_URL,
request_timeout: Optional[float] = None
):
"""Initialize the BaseClient with the base URL.
:param base_url: The base URL for the API endpoints,
defaults to CURRENT_NODE_CMAPI_URL
"""
self.base_url = base_url
self.request_timeout = request_timeout
self.cmd_class = None
def _request(
self, method: str, endpoint: str,
data: Optional[Dict[str, Any]] = None
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Make a request to the API.
:param method: The HTTP method to use.
:param endpoint: The API endpoint to call.
:param data: The data to send with the request.
:return: The response from the API.
"""
url = f'{self.base_url}/cmapi/{_version}/{self.cmd_class}/{endpoint}'
cmapi_cfg_parser = get_config_parser(CMAPI_CONF_PATH)
key = get_current_key(cmapi_cfg_parser)
headers = {'x-api-key': key}
if method in ['PUT', 'POST', 'DELETE']:
headers['Content-Type'] = 'application/json'
data = {'in_transaction': True, **(data or {})}
try:
response = requests.request(
method, url, headers=headers,
params=data if method == 'GET' else None,
json=data if method in ('PUT', 'POST') else None,
timeout=self.request_timeout, verify=False
)
response.raise_for_status()
return response.json()
# TODO: different handler for timeout exception?
except requests.HTTPError as exc:
resp = exc.response
error_msg = str(exc)
if resp.status_code == 422:
# in this case we think cmapi server returned some value but
# had error during running endpoint handler code
try:
resp_json = response.json()
error_msg = resp_json.get('error', resp_json)
except requests.exceptions.JSONDecodeError:
error_msg = response.text
message = (
f'API client got an exception in request to {exc.request.url} '
f'with code {resp.status_code} and error: {error_msg}'
)
logging.error(message)
raise CMAPIBasicError(message)
except requests.exceptions.RequestException as exc:
message = (
'API client got an undefined error in request to '
f'{exc.request.url} with code {exc.response.status_code} and '
f'error: {str(exc)}'
)
logging.error(message)
raise CMAPIBasicError(message)
class ClusterControllerClient(BaseClient):
"""Client for the ClusterController API.
This class provides methods for interacting with the cluster
management API, including starting and stopping the cluster,
adding and removing nodes, and getting the cluster status.
"""
def __init__(
self, base_url: str = CURRENT_NODE_CMAPI_URL,
request_timeout: Optional[float] = None
):
"""Initialize the ClusterControllerClient with the base URL.
WARNING: This class only handles the API requests, it does not
handle the transaction management. So it should be started
at level above using TransactionManager (decorator or context manager).
:param base_url: The base URL for the API endpoints,
defaults to CURRENT_NODE_CMAPI_URL
:type base_url: str, optional
:param request_timeout: request timeout, defaults to None
:type request_timeout: Optional[float], optional
"""
self.base_url = base_url
self.request_timeout = request_timeout
super().__init__(base_url, request_timeout)
self.cmd_class = 'cluster'
def start_cluster(
self, extra: Dict[str, Any] = dict()
@@ -122,54 +202,48 @@ class ClusterControllerClient:
"""
return self._request('put', 'load_s3data', s3data_info)
def _request(
self, method: str, endpoint: str,
data: Optional[Dict[str, Any]] = None
def check_shared_storage(
self, extra: Dict[str, Any] = dict()
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Make a request to the API.
"""Check if shared storage working.
:param method: The HTTP method to use.
:param endpoint: The API endpoint to call.
:param data: The data to send with the request.
:return: The response from the API.
"""
url = f'{self.base_url}/cmapi/{_version}/cluster/{endpoint}'
cmapi_cfg_parser = get_config_parser(CMAPI_CONF_PATH)
key = get_current_key(cmapi_cfg_parser)
headers = {'x-api-key': key}
if method in ['PUT', 'POST', 'DELETE']:
headers['Content-Type'] = 'application/json'
data = {'in_transaction': True, **(data or {})}
try:
response = requests.request(
method, url, headers=headers, json=data,
timeout=self.request_timeout, verify=False
)
response.raise_for_status()
return response.json()
# TODO: different handler for timeout exception?
except requests.HTTPError as exc:
resp = exc.response
error_msg = str(exc)
if resp.status_code == 422:
# in this case we think cmapi server returned some value but
# had error during running endpoint handler code
try:
resp_json = response.json()
error_msg = resp_json.get('error', resp_json)
except requests.exceptions.JSONDecodeError:
error_msg = response.text
message = (
f'API client got an exception in request to {exc.request.url} '
f'with code {resp.status_code} and error: {error_msg}'
)
logging.error(message)
raise CMAPIBasicError(message)
except requests.exceptions.RequestException as exc:
message = (
'API client got an undefined error in request to '
f'{exc.request.url} with code {exc.response.status_code} and '
f'error: {str(exc)}'
)
logging.error(message)
raise CMAPIBasicError(message)
return self._request('PUT', 'check-shared-storage', extra)
class NodeControllerClient(BaseClient):
"""Client for the NodeController API.
This class provides methods for interacting with a node management
API.
"""
def __init__(
self, base_url: str = CURRENT_NODE_CMAPI_URL,
request_timeout: Optional[float] = None
):
"""Initialize the NodeControllerClient with the base URL.
:param base_url: The base URL for the API endpoints,
defaults to CURRENT_NODE_CMAPI_URL
:type base_url: str, optional
:param request_timeout: request timeout, defaults to None
:type request_timeout: Optional[float], optional
"""
super().__init__(base_url, request_timeout)
self.cmd_class = 'node'
def check_shared_file(
self, file_path: str, check_sum: str
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Get packages versions installed on a node.
:param file_path: file path to check
:type file_path: str
:param check_sum: expected MD5 file checksum
:type check_sum: str
:return: The response from the API.
"""
data = {
'file_path': file_path,
'check_sum': check_sum,
}
return self._request('GET', 'check-shared-file', data)

View File

@@ -6,12 +6,13 @@ from cmapi_server.controllers.endpoints import (
StatusController, ConfigController, BeginController, CommitController,
RollbackController, StartController, ShutdownController,
ExtentMapController, ClusterController, ApiKeyController,
LoggingConfigController, AppController, NodeProcessController
LoggingConfigController, AppController, NodeController,
NodeProcessController
)
from cmapi_server.controllers.s3dataload import S3DataLoadController
_version = '0.4.0'
_version = '0.4.0' #TODO: MOVE to constants
dispatcher = cherrypy.dispatch.RoutesDispatcher()
@@ -261,6 +262,26 @@ dispatcher.connect(
)
# /_version/node/check-shared-file/ (GET)
dispatcher.connect(
name = 'node_check_shared_file',
route = f'/cmapi/{_version}/node/check-shared-file',
action = 'check_shared_file',
controller = NodeController(),
conditions = {'method': ['GET']}
)
# /_version/cluster/check-shared-storage/ (PUT)
dispatcher.connect(
name = 'cluster_check_shared_storage',
route = f'/cmapi/{_version}/cluster/check-shared-storage',
action = 'check_shared_storage',
controller = ClusterController(),
conditions = {'method': ['PUT']}
)
def jsonify_error(status, message, traceback, version): \
# pylint: disable=unused-argument
"""JSONify all CherryPy error responses (created by raising the

View File

@@ -1,9 +1,8 @@
import logging
import hashlib
import socket
import subprocess
import time
from copy import deepcopy
from datetime import datetime
from pathlib import Path
@@ -1106,6 +1105,26 @@ class ClusterController:
module_logger.debug(f'{func_name} returns {str(response)}')
return response
@cherrypy.tools.timeit()
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.validate_api_key() # pylint: disable=no-member
def check_shared_storage(self):
"""Handler for /cluster/check-shared-storage/ (PUT) endpoint."""
func_name = 'check_shared_storage'
log_begin(module_logger, func_name)
try:
response = ClusterHandler.check_shared_storage()
except CMAPIBasicError as err:
raise_422_error(module_logger, func_name, err.message)
except Exception:
raise_422_error(
module_logger, func_name,
'Undefined error happened while checking shared storage.'
)
module_logger.debug(f'{func_name} returns {str(response)}')
return response
class ApiKeyController:
@cherrypy.tools.timeit()
@@ -1261,3 +1280,36 @@ class NodeProcessController():
}
module_logger.debug(f'{func_name} returns {str(response)}')
return response
class NodeController:
@cherrypy.tools.timeit()
@cherrypy.tools.json_out()
@cherrypy.tools.validate_api_key() # pylint: disable=no-member
def check_shared_file(self, file_path, check_sum):
func_name = 'check_shared_file'
log_begin(module_logger, func_name)
ACCEPTED_PATHS = (
'/var/lib/columnstore/data1/x',
'/var/lib/columnstore/storagemanager/metadata/data1/x'
)
if not file_path.startswith(ACCEPTED_PATHS):
raise_422_error('Not acceptable file_path.')
success = True
file_path_obj = Path(file_path)
if not file_path_obj.exists():
success = False
else:
with file_path_obj.open(mode='rb') as file_to_check:
data = file_to_check.read()
calculated_md5 = hashlib.md5(data).hexdigest()
if calculated_md5 != check_sum:
success = False
response = {
'timestamp': str(datetime.now()),
'success': success
}
return response

View File

@@ -1,5 +1,8 @@
"""Module contains Cluster business logic functions."""
import configparser
import hashlib
import logging
import tempfile
from datetime import datetime
from enum import Enum
from typing import Optional
@@ -7,8 +10,10 @@ from typing import Optional
import requests
from cmapi_server.constants import (
CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH,
CMAPI_CONF_PATH, CMAPI_PORT, DEFAULT_MCS_CONF_PATH, REQUEST_TIMEOUT,
)
from cmapi_server.managers.application import AppManager
from cmapi_server.controllers.api_clients import NodeControllerClient
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.helpers import (
broadcast_new_config, get_active_nodes, get_dbroots, get_config_parser,
@@ -399,3 +404,69 @@ class ClusterHandler():
'Successfully finished setting new log level to all nodes.'
)
return response
@staticmethod
def check_shared_storage():
"""Check shared storage.
:return: status result
:rtype: dict
"""
tmp_file_path: str
active_nodes = get_active_nodes()
all_responses: dict = dict()
sm_parser = configparser.ConfigParser()
sm_config_str = NodeConfig().get_current_sm_config()
sm_parser.read_string(sm_config_str)
storage_type = sm_parser.get(
'ObjectStorage', 'service', fallback='LocalStorage'
)
file_dir = '/var/lib/columnstore/data1'
if storage_type.lower == 's3':
file_dir = '/var/lib/columnstore/storagemanager/metadata/data1'
with tempfile.NamedTemporaryFile(
mode='wb+', delete=True, dir=file_dir, prefix='mcs_test_shared_'
) as temp_file:
file_data = rb'File to check shared storage working.'
temp_file.write(file_data)
tmp_file_md5 = hashlib.md5(file_data)
tmp_file_path = temp_file.name
logging.debug(f'Temporary file created at: {temp_file.name}')
for node in active_nodes:
logging.debug(f'Checking shared file on {node!r}.')
client = NodeControllerClient(
request_timeout=REQUEST_TIMEOUT,
base_url=f'https://{node}:{CMAPI_PORT}'
)
node_response = client.check_shared_file(
file_path=tmp_file_path, check_sum=tmp_file_md5
)
logging.debug(f'Finished checking file on {node!r}')
all_responses[node] = node_response
nodes_success_responses = (
v['success'] for _, v in all_responses.items()
)
if nodes_success_responses:
shared_storage = all(nodes_success_responses)
else:
# no nodes in cluster case
shared_storage = False
partially_failed = False
if len(active_nodes) > 2 and not shared_storage:
# case when some nodes got shared file, and some are not
partially_failed = sum(nodes_success_responses) > 1
response = {
'timestamp': str(datetime.now()),
'shared_storage': shared_storage,
'partially_failed': partially_failed,
'active_nodes_count': len(active_nodes),
**all_responses
}
logging.debug(
'Successfully finished checking shared storage on all nodes.'
)
return response

View File

@@ -59,6 +59,42 @@ def switch_node_maintenance(
# TODO: probably move publishing to cherrypy.engine failover channel here?
def set_shared_storage(
state: bool,
input_config_filename: str = DEFAULT_MCS_CONF_PATH,
output_config_filename: str = DEFAULT_MCS_CONF_PATH,
) -> bool:
"""Set shared storage state in Columnstore.xml.
:param state: state to set
:type state: bool
:param input_config_filename: mcs input config path,
defaults to DEFAULT_MCS_CONF_PATH
:type input_config_filename: str, optional
:param output_config_filename: mcs output config path,
defaults to DEFAULT_MCS_CONF_PATH
:return: True if new state to set is differ from state from Columnstore.xml
:rtype: bool
"""
current_state: str
node_config = NodeConfig()
config_root = node_config.get_current_config_root(input_config_filename)
shared_storage_element = config_root.find('SharedStorage')
if shared_storage_element is None:
shared_storage_element = etree.SubElement(config_root, 'SharedStorage')
else:
current_state = shared_storage_element.text
new_state = str(state).lower()
if current_state != new_state:
logging.debug(f'Shared storage state changed to {new_state!r}')
shared_storage_element.text = str(state).lower()
node_config.write_config(config_root, filename=output_config_filename)
return True
else:
# shared storage state not changed
return False
def add_node(
node: str, input_config_filename: str = DEFAULT_MCS_CONF_PATH,
output_config_filename: Optional[str] = None,

View File

@@ -213,7 +213,8 @@ class AgentComm:
if not needs_transaction and event.name in (
self._agent.activateNodes,
self._agent.deactivateNodes,
self._agent.movePrimaryNode):
self._agent.movePrimaryNode
):
needs_transaction = True
if event.name == self._agent.activateNodes:

View File

@@ -3,6 +3,8 @@ import logging
import threading
from os.path import getmtime
import lxml
from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, DEFAULT_SM_CONF_PATH
from mcs_node_control.models.node_config import NodeConfig
@@ -15,6 +17,7 @@ class Config:
_active_nodes = []
_inactive_nodes = []
_primary_node = ''
_shared_storage = False
_my_name = None # derived from config file
config_lock = threading.Lock()
@@ -67,24 +70,14 @@ class Config:
self.config_lock.release()
return ret
def is_shared_storage(self, sm_config_file=DEFAULT_SM_CONF_PATH):
def is_shared_storage(self) -> bool:
"""Check if SM is S3 or not.
:param sm_config_file: path to SM config,
defaults to DEFAULT_SM_CONF_PATH
:type sm_config_file: str, optional
:return: True if SM is S3 otherwise False
:return: True if SM is S3 or shared FS otherwise False
:rtype: bool
TODO: remove in next releases, useless?
"""
sm_config = configparser.ConfigParser()
sm_config.read(sm_config_file)
# only LocalStorage or S3 can be returned for now
storage = sm_config.get(
'ObjectStorage', 'service', fallback='LocalStorage'
)
return storage.lower() == 's3'
self.check_reload()
return self._shared_storage
def check_reload(self):
"""Check config reload.
@@ -174,4 +167,11 @@ class Config:
self._primary_node = primary_node
self.last_mtime = last_mtime
self._my_name = my_name
self._shared_storage = False
shared_storage_element = root.find('./SharedStorage')
if shared_storage_element is not None:
self._shared_storage = lxml.objectify.BoolElement(
shared_storage_element.text
)
return True

View File

@@ -7,6 +7,10 @@ from .config import Config
from .heartbeat_history import HBHistory
from .agent_comm import AgentComm
from cmapi_server import helpers
from cmapi_server.node_manipulation import set_shared_storage
from cmapi_server.handlers.cluster import ClusterHandler
class NodeMonitor:
@@ -79,6 +83,22 @@ class NodeMonitor:
time.sleep(1)
self._logger.info("node monitor logic exiting normally...")
def check_shared_storage(self):
# need to do it only in one node
result = ClusterHandler.check_shared_storage()
shared_storage_on = result['shared_storage']
active_nodes_count = int(result['active_nodes_count'])
if active_nodes_count < 2:
logging.debug(
'Less than 2 nodes in cluster, no need to update SharedStorage '
'flag in Columnstore.xml.'
)
else:
state_changed = set_shared_storage(shared_storage_on)
if state_changed:
helpers.broadcast_new_config()
def _monitor(self):
"""
This works like the main loop of a game.

View File

@@ -115,6 +115,10 @@ class NodeConfig:
maintenance = etree.SubElement(root, 'Maintenance')
maintenance.text = str(False).lower()
# Add SharedStorage tag and set to False
shared_storage = etree.SubElement(root, 'SharedStorage')
shared_storage.text = str(False).lower()
def upgrade_config(self, tree=None, root=None, upgrade=True):
"""
Add the parts that might be missing after an upgrade from an earlier
@@ -381,7 +385,7 @@ class NodeConfig:
addrs = ni.addresses.get(fam)
if addrs is not None:
for addr in addrs:
yield(addr)
yield addr
def get_network_addresses_and_names(self):
"""Retrievs the list of the network addresses, hostnames, and aliases
@@ -395,7 +399,7 @@ class NodeConfig:
addrs = ni.addresses.get(fam)
if addrs is not None:
for addr in addrs:
yield(addr)
yield addr
try:
(host, aliases, _) = socket.gethostbyaddr(addr)
except: