From 48148b1e6e44f2ae59e8303781c665cd5370b7b2 Mon Sep 17 00:00:00 2001 From: mariadb-AlanMologorsky Date: Fri, 18 Jul 2025 16:00:34 +0300 Subject: [PATCH] feat(cmapi): MCOL-6006 Disable failover when shared storage not detected add Base api client add NodeController api client fix ClusterController Api client add NodeController in endpoints.py add 2 new endpoints to dispatcher: cluster/check-shared-storage and node/check-shared-file add check_shared_storage method into ClusterHandler class add set_shared_storage to node_manipulation t add reading shared_storage in failover/config.py fix is_shared_storage method in failover/config.py add check_shared_storage in NodeMonitor class fix some minor styling --- cmapi/cmapi_server/__main__.py | 2 + cmapi/cmapi_server/controllers/api_clients.py | 186 ++++++++++++------ cmapi/cmapi_server/controllers/dispatcher.py | 25 ++- cmapi/cmapi_server/controllers/endpoints.py | 56 +++++- cmapi/cmapi_server/handlers/cluster.py | 73 ++++++- cmapi/cmapi_server/node_manipulation.py | 36 ++++ cmapi/failover/agent_comm.py | 7 +- cmapi/failover/config.py | 28 +-- cmapi/failover/node_monitor.py | 20 ++ cmapi/mcs_node_control/models/node_config.py | 8 +- 10 files changed, 361 insertions(+), 80 deletions(-) diff --git a/cmapi/cmapi_server/__main__.py b/cmapi/cmapi_server/__main__.py index 32e25a71e..cd1996b71 100644 --- a/cmapi/cmapi_server/__main__.py +++ b/cmapi/cmapi_server/__main__.py @@ -22,9 +22,11 @@ from cmapi_server import helpers from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, CMAPI_CONF_PATH from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error from cmapi_server.failover_agent import FailoverAgent +from cmapi_server.handlers.cluster import ClusterHandler from cmapi_server.managers.application import AppManager from cmapi_server.managers.process import MCSProcessManager from cmapi_server.managers.certificate import CertificateManager +from cmapi_server.node_manipulation import set_shared_storage from failover.node_monitor import NodeMonitor from mcs_node_control.models.dbrm_socket import SOCK_TIMEOUT, DBRMSocketHandler from mcs_node_control.models.node_config import NodeConfig diff --git a/cmapi/cmapi_server/controllers/api_clients.py b/cmapi/cmapi_server/controllers/api_clients.py index 7b7e69622..86c737455 100644 --- a/cmapi/cmapi_server/controllers/api_clients.py +++ b/cmapi/cmapi_server/controllers/api_clients.py @@ -4,7 +4,6 @@ from typing import Any, Dict, Optional, Union import pyotp import requests -from cmapi_server.controllers.dispatcher import _version from cmapi_server.constants import ( CMAPI_CONF_PATH, CURRENT_NODE_CMAPI_URL, SECRET_KEY, ) @@ -12,25 +11,106 @@ from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.helpers import get_config_parser, get_current_key -class ClusterControllerClient: +_version = '0.4.0' + + +class BaseClient: + """Base class for API clients. + This class is not intended to be used directly, but rather as a + base class for other API clients. It provides a common interface + for making requests to the API and handling responses. + WARNING: This class only handles the API requests, it does not + handle the transaction management. So it should be started + at level above using TransactionManager (decorator or context + manager). + """ + def __init__( + self, base_url: str = CURRENT_NODE_CMAPI_URL, + request_timeout: Optional[float] = None + ): + """Initialize the BaseClient with the base URL. + + :param base_url: The base URL for the API endpoints, + defaults to CURRENT_NODE_CMAPI_URL + """ + self.base_url = base_url + self.request_timeout = request_timeout + self.cmd_class = None + + + def _request( + self, method: str, endpoint: str, + data: Optional[Dict[str, Any]] = None + ) -> Union[Dict[str, Any], Dict[str, str]]: + """Make a request to the API. + :param method: The HTTP method to use. + :param endpoint: The API endpoint to call. + :param data: The data to send with the request. + :return: The response from the API. + """ + url = f'{self.base_url}/cmapi/{_version}/{self.cmd_class}/{endpoint}' + cmapi_cfg_parser = get_config_parser(CMAPI_CONF_PATH) + key = get_current_key(cmapi_cfg_parser) + headers = {'x-api-key': key} + if method in ['PUT', 'POST', 'DELETE']: + headers['Content-Type'] = 'application/json' + data = {'in_transaction': True, **(data or {})} + try: + response = requests.request( + method, url, headers=headers, + params=data if method == 'GET' else None, + json=data if method in ('PUT', 'POST') else None, + timeout=self.request_timeout, verify=False + ) + response.raise_for_status() + return response.json() + # TODO: different handler for timeout exception? + except requests.HTTPError as exc: + resp = exc.response + error_msg = str(exc) + if resp.status_code == 422: + # in this case we think cmapi server returned some value but + # had error during running endpoint handler code + try: + resp_json = response.json() + error_msg = resp_json.get('error', resp_json) + except requests.exceptions.JSONDecodeError: + error_msg = response.text + message = ( + f'API client got an exception in request to {exc.request.url} ' + f'with code {resp.status_code} and error: {error_msg}' + ) + logging.error(message) + raise CMAPIBasicError(message) + except requests.exceptions.RequestException as exc: + message = ( + 'API client got an undefined error in request to ' + f'{exc.request.url} with code {exc.response.status_code} and ' + f'error: {str(exc)}' + ) + logging.error(message) + raise CMAPIBasicError(message) + + +class ClusterControllerClient(BaseClient): + """Client for the ClusterController API. + This class provides methods for interacting with the cluster + management API, including starting and stopping the cluster, + adding and removing nodes, and getting the cluster status. + """ def __init__( self, base_url: str = CURRENT_NODE_CMAPI_URL, request_timeout: Optional[float] = None ): """Initialize the ClusterControllerClient with the base URL. - - WARNING: This class only handles the API requests, it does not - handle the transaction management. So it should be started - at level above using TransactionManager (decorator or context manager). - :param base_url: The base URL for the API endpoints, defaults to CURRENT_NODE_CMAPI_URL :type base_url: str, optional :param request_timeout: request timeout, defaults to None :type request_timeout: Optional[float], optional """ - self.base_url = base_url - self.request_timeout = request_timeout + super().__init__(base_url, request_timeout) + self.cmd_class = 'cluster' def start_cluster( self, extra: Dict[str, Any] = dict() @@ -122,54 +202,48 @@ class ClusterControllerClient: """ return self._request('put', 'load_s3data', s3data_info) - def _request( - self, method: str, endpoint: str, - data: Optional[Dict[str, Any]] = None + def check_shared_storage( + self, extra: Dict[str, Any] = dict() ) -> Union[Dict[str, Any], Dict[str, str]]: - """Make a request to the API. + """Check if shared storage working. - :param method: The HTTP method to use. - :param endpoint: The API endpoint to call. - :param data: The data to send with the request. :return: The response from the API. """ - url = f'{self.base_url}/cmapi/{_version}/cluster/{endpoint}' - cmapi_cfg_parser = get_config_parser(CMAPI_CONF_PATH) - key = get_current_key(cmapi_cfg_parser) - headers = {'x-api-key': key} - if method in ['PUT', 'POST', 'DELETE']: - headers['Content-Type'] = 'application/json' - data = {'in_transaction': True, **(data or {})} - try: - response = requests.request( - method, url, headers=headers, json=data, - timeout=self.request_timeout, verify=False - ) - response.raise_for_status() - return response.json() - # TODO: different handler for timeout exception? - except requests.HTTPError as exc: - resp = exc.response - error_msg = str(exc) - if resp.status_code == 422: - # in this case we think cmapi server returned some value but - # had error during running endpoint handler code - try: - resp_json = response.json() - error_msg = resp_json.get('error', resp_json) - except requests.exceptions.JSONDecodeError: - error_msg = response.text - message = ( - f'API client got an exception in request to {exc.request.url} ' - f'with code {resp.status_code} and error: {error_msg}' - ) - logging.error(message) - raise CMAPIBasicError(message) - except requests.exceptions.RequestException as exc: - message = ( - 'API client got an undefined error in request to ' - f'{exc.request.url} with code {exc.response.status_code} and ' - f'error: {str(exc)}' - ) - logging.error(message) - raise CMAPIBasicError(message) + return self._request('PUT', 'check-shared-storage', extra) + + +class NodeControllerClient(BaseClient): + """Client for the NodeController API. + This class provides methods for interacting with a node management + API. + """ + def __init__( + self, base_url: str = CURRENT_NODE_CMAPI_URL, + request_timeout: Optional[float] = None + ): + """Initialize the NodeControllerClient with the base URL. + :param base_url: The base URL for the API endpoints, + defaults to CURRENT_NODE_CMAPI_URL + :type base_url: str, optional + :param request_timeout: request timeout, defaults to None + :type request_timeout: Optional[float], optional + """ + super().__init__(base_url, request_timeout) + self.cmd_class = 'node' + + def check_shared_file( + self, file_path: str, check_sum: str + ) -> Union[Dict[str, Any], Dict[str, str]]: + """Get packages versions installed on a node. + + :param file_path: file path to check + :type file_path: str + :param check_sum: expected MD5 file checksum + :type check_sum: str + :return: The response from the API. + """ + data = { + 'file_path': file_path, + 'check_sum': check_sum, + } + return self._request('GET', 'check-shared-file', data) diff --git a/cmapi/cmapi_server/controllers/dispatcher.py b/cmapi/cmapi_server/controllers/dispatcher.py index ba7c90565..d8a0219a1 100644 --- a/cmapi/cmapi_server/controllers/dispatcher.py +++ b/cmapi/cmapi_server/controllers/dispatcher.py @@ -6,12 +6,13 @@ from cmapi_server.controllers.endpoints import ( StatusController, ConfigController, BeginController, CommitController, RollbackController, StartController, ShutdownController, ExtentMapController, ClusterController, ApiKeyController, - LoggingConfigController, AppController, NodeProcessController + LoggingConfigController, AppController, NodeController, + NodeProcessController ) from cmapi_server.controllers.s3dataload import S3DataLoadController -_version = '0.4.0' +_version = '0.4.0' #TODO: MOVE to constants dispatcher = cherrypy.dispatch.RoutesDispatcher() @@ -261,6 +262,26 @@ dispatcher.connect( ) +# /_version/node/check-shared-file/ (GET) +dispatcher.connect( + name = 'node_check_shared_file', + route = f'/cmapi/{_version}/node/check-shared-file', + action = 'check_shared_file', + controller = NodeController(), + conditions = {'method': ['GET']} +) + + +# /_version/cluster/check-shared-storage/ (PUT) +dispatcher.connect( + name = 'cluster_check_shared_storage', + route = f'/cmapi/{_version}/cluster/check-shared-storage', + action = 'check_shared_storage', + controller = ClusterController(), + conditions = {'method': ['PUT']} +) + + def jsonify_error(status, message, traceback, version): \ # pylint: disable=unused-argument """JSONify all CherryPy error responses (created by raising the diff --git a/cmapi/cmapi_server/controllers/endpoints.py b/cmapi/cmapi_server/controllers/endpoints.py index 870e07b35..a399b90ca 100644 --- a/cmapi/cmapi_server/controllers/endpoints.py +++ b/cmapi/cmapi_server/controllers/endpoints.py @@ -1,9 +1,8 @@ import logging - +import hashlib import socket import subprocess import time - from copy import deepcopy from datetime import datetime from pathlib import Path @@ -1106,6 +1105,26 @@ class ClusterController: module_logger.debug(f'{func_name} returns {str(response)}') return response + @cherrypy.tools.timeit() + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.validate_api_key() # pylint: disable=no-member + def check_shared_storage(self): + """Handler for /cluster/check-shared-storage/ (PUT) endpoint.""" + func_name = 'check_shared_storage' + log_begin(module_logger, func_name) + try: + response = ClusterHandler.check_shared_storage() + except CMAPIBasicError as err: + raise_422_error(module_logger, func_name, err.message) + except Exception: + raise_422_error( + module_logger, func_name, + 'Undefined error happened while checking shared storage.' + ) + module_logger.debug(f'{func_name} returns {str(response)}') + return response + class ApiKeyController: @cherrypy.tools.timeit() @@ -1261,3 +1280,36 @@ class NodeProcessController(): } module_logger.debug(f'{func_name} returns {str(response)}') return response + + +class NodeController: + + @cherrypy.tools.timeit() + @cherrypy.tools.json_out() + @cherrypy.tools.validate_api_key() # pylint: disable=no-member + def check_shared_file(self, file_path, check_sum): + func_name = 'check_shared_file' + log_begin(module_logger, func_name) + ACCEPTED_PATHS = ( + '/var/lib/columnstore/data1/x', + '/var/lib/columnstore/storagemanager/metadata/data1/x' + ) + if not file_path.startswith(ACCEPTED_PATHS): + raise_422_error('Not acceptable file_path.') + + success = True + file_path_obj = Path(file_path) + if not file_path_obj.exists(): + success = False + else: + with file_path_obj.open(mode='rb') as file_to_check: + data = file_to_check.read() + calculated_md5 = hashlib.md5(data).hexdigest() + if calculated_md5 != check_sum: + success = False + + response = { + 'timestamp': str(datetime.now()), + 'success': success + } + return response diff --git a/cmapi/cmapi_server/handlers/cluster.py b/cmapi/cmapi_server/handlers/cluster.py index f2d8f892b..2d276c741 100644 --- a/cmapi/cmapi_server/handlers/cluster.py +++ b/cmapi/cmapi_server/handlers/cluster.py @@ -1,5 +1,8 @@ """Module contains Cluster business logic functions.""" +import configparser +import hashlib import logging +import tempfile from datetime import datetime from enum import Enum from typing import Optional @@ -7,8 +10,10 @@ from typing import Optional import requests from cmapi_server.constants import ( - CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH, + CMAPI_CONF_PATH, CMAPI_PORT, DEFAULT_MCS_CONF_PATH, REQUEST_TIMEOUT, ) +from cmapi_server.managers.application import AppManager +from cmapi_server.controllers.api_clients import NodeControllerClient from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.helpers import ( broadcast_new_config, get_active_nodes, get_dbroots, get_config_parser, @@ -399,3 +404,69 @@ class ClusterHandler(): 'Successfully finished setting new log level to all nodes.' ) return response + + @staticmethod + def check_shared_storage(): + """Check shared storage. + + :return: status result + :rtype: dict + """ + tmp_file_path: str + active_nodes = get_active_nodes() + all_responses: dict = dict() + sm_parser = configparser.ConfigParser() + sm_config_str = NodeConfig().get_current_sm_config() + sm_parser.read_string(sm_config_str) + storage_type = sm_parser.get( + 'ObjectStorage', 'service', fallback='LocalStorage' + ) + file_dir = '/var/lib/columnstore/data1' + if storage_type.lower == 's3': + file_dir = '/var/lib/columnstore/storagemanager/metadata/data1' + + with tempfile.NamedTemporaryFile( + mode='wb+', delete=True, dir=file_dir, prefix='mcs_test_shared_' + ) as temp_file: + file_data = rb'File to check shared storage working.' + temp_file.write(file_data) + tmp_file_md5 = hashlib.md5(file_data) + tmp_file_path = temp_file.name + logging.debug(f'Temporary file created at: {temp_file.name}') + for node in active_nodes: + logging.debug(f'Checking shared file on {node!r}.') + client = NodeControllerClient( + request_timeout=REQUEST_TIMEOUT, + base_url=f'https://{node}:{CMAPI_PORT}' + ) + node_response = client.check_shared_file( + file_path=tmp_file_path, check_sum=tmp_file_md5 + ) + logging.debug(f'Finished checking file on {node!r}') + all_responses[node] = node_response + + nodes_success_responses = ( + v['success'] for _, v in all_responses.items() + ) + if nodes_success_responses: + shared_storage = all(nodes_success_responses) + else: + # no nodes in cluster case + shared_storage = False + partially_failed = False + if len(active_nodes) > 2 and not shared_storage: + # case when some nodes got shared file, and some are not + partially_failed = sum(nodes_success_responses) > 1 + + response = { + 'timestamp': str(datetime.now()), + 'shared_storage': shared_storage, + 'partially_failed': partially_failed, + 'active_nodes_count': len(active_nodes), + **all_responses + } + + logging.debug( + 'Successfully finished checking shared storage on all nodes.' + ) + return response diff --git a/cmapi/cmapi_server/node_manipulation.py b/cmapi/cmapi_server/node_manipulation.py index bccdb142a..89370c69c 100644 --- a/cmapi/cmapi_server/node_manipulation.py +++ b/cmapi/cmapi_server/node_manipulation.py @@ -59,6 +59,42 @@ def switch_node_maintenance( # TODO: probably move publishing to cherrypy.engine failover channel here? +def set_shared_storage( + state: bool, + input_config_filename: str = DEFAULT_MCS_CONF_PATH, + output_config_filename: str = DEFAULT_MCS_CONF_PATH, +) -> bool: + """Set shared storage state in Columnstore.xml. + + :param state: state to set + :type state: bool + :param input_config_filename: mcs input config path, + defaults to DEFAULT_MCS_CONF_PATH + :type input_config_filename: str, optional + :param output_config_filename: mcs output config path, + defaults to DEFAULT_MCS_CONF_PATH + :return: True if new state to set is differ from state from Columnstore.xml + :rtype: bool + """ + current_state: str + node_config = NodeConfig() + config_root = node_config.get_current_config_root(input_config_filename) + shared_storage_element = config_root.find('SharedStorage') + if shared_storage_element is None: + shared_storage_element = etree.SubElement(config_root, 'SharedStorage') + else: + current_state = shared_storage_element.text + new_state = str(state).lower() + if current_state != new_state: + logging.debug(f'Shared storage state changed to {new_state!r}') + shared_storage_element.text = str(state).lower() + node_config.write_config(config_root, filename=output_config_filename) + return True + else: + # shared storage state not changed + return False + + def add_node( node: str, input_config_filename: str = DEFAULT_MCS_CONF_PATH, output_config_filename: Optional[str] = None, diff --git a/cmapi/failover/agent_comm.py b/cmapi/failover/agent_comm.py index d9e169138..e36e0d3a8 100644 --- a/cmapi/failover/agent_comm.py +++ b/cmapi/failover/agent_comm.py @@ -211,9 +211,10 @@ class AgentComm: # determine whether we need a transaction at all. # List the fcns that require a txn here. if not needs_transaction and event.name in ( - self._agent.activateNodes, - self._agent.deactivateNodes, - self._agent.movePrimaryNode): + self._agent.activateNodes, + self._agent.deactivateNodes, + self._agent.movePrimaryNode + ): needs_transaction = True if event.name == self._agent.activateNodes: diff --git a/cmapi/failover/config.py b/cmapi/failover/config.py index 0e87a4137..59ce5939b 100644 --- a/cmapi/failover/config.py +++ b/cmapi/failover/config.py @@ -3,6 +3,8 @@ import logging import threading from os.path import getmtime +import lxml + from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, DEFAULT_SM_CONF_PATH from mcs_node_control.models.node_config import NodeConfig @@ -15,6 +17,7 @@ class Config: _active_nodes = [] _inactive_nodes = [] _primary_node = '' + _shared_storage = False _my_name = None # derived from config file config_lock = threading.Lock() @@ -67,24 +70,14 @@ class Config: self.config_lock.release() return ret - def is_shared_storage(self, sm_config_file=DEFAULT_SM_CONF_PATH): + def is_shared_storage(self) -> bool: """Check if SM is S3 or not. - :param sm_config_file: path to SM config, - defaults to DEFAULT_SM_CONF_PATH - :type sm_config_file: str, optional - :return: True if SM is S3 otherwise False + :return: True if SM is S3 or shared FS otherwise False :rtype: bool - - TODO: remove in next releases, useless? """ - sm_config = configparser.ConfigParser() - sm_config.read(sm_config_file) - # only LocalStorage or S3 can be returned for now - storage = sm_config.get( - 'ObjectStorage', 'service', fallback='LocalStorage' - ) - return storage.lower() == 's3' + self.check_reload() + return self._shared_storage def check_reload(self): """Check config reload. @@ -174,4 +167,11 @@ class Config: self._primary_node = primary_node self.last_mtime = last_mtime self._my_name = my_name + + self._shared_storage = False + shared_storage_element = root.find('./SharedStorage') + if shared_storage_element is not None: + self._shared_storage = lxml.objectify.BoolElement( + shared_storage_element.text + ) return True diff --git a/cmapi/failover/node_monitor.py b/cmapi/failover/node_monitor.py index cbcf758de..f1ffdc9f8 100644 --- a/cmapi/failover/node_monitor.py +++ b/cmapi/failover/node_monitor.py @@ -7,6 +7,10 @@ from .config import Config from .heartbeat_history import HBHistory from .agent_comm import AgentComm +from cmapi_server import helpers +from cmapi_server.node_manipulation import set_shared_storage +from cmapi_server.handlers.cluster import ClusterHandler + class NodeMonitor: @@ -79,6 +83,22 @@ class NodeMonitor: time.sleep(1) self._logger.info("node monitor logic exiting normally...") + def check_shared_storage(self): + # need to do it only in one node + result = ClusterHandler.check_shared_storage() + shared_storage_on = result['shared_storage'] + active_nodes_count = int(result['active_nodes_count']) + if active_nodes_count < 2: + logging.debug( + 'Less than 2 nodes in cluster, no need to update SharedStorage ' + 'flag in Columnstore.xml.' + ) + else: + state_changed = set_shared_storage(shared_storage_on) + if state_changed: + helpers.broadcast_new_config() + + def _monitor(self): """ This works like the main loop of a game. diff --git a/cmapi/mcs_node_control/models/node_config.py b/cmapi/mcs_node_control/models/node_config.py index 7dac18bce..91ea69221 100644 --- a/cmapi/mcs_node_control/models/node_config.py +++ b/cmapi/mcs_node_control/models/node_config.py @@ -115,6 +115,10 @@ class NodeConfig: maintenance = etree.SubElement(root, 'Maintenance') maintenance.text = str(False).lower() + # Add SharedStorage tag and set to False + shared_storage = etree.SubElement(root, 'SharedStorage') + shared_storage.text = str(False).lower() + def upgrade_config(self, tree=None, root=None, upgrade=True): """ Add the parts that might be missing after an upgrade from an earlier @@ -381,7 +385,7 @@ class NodeConfig: addrs = ni.addresses.get(fam) if addrs is not None: for addr in addrs: - yield(addr) + yield addr def get_network_addresses_and_names(self): """Retrievs the list of the network addresses, hostnames, and aliases @@ -395,7 +399,7 @@ class NodeConfig: addrs = ni.addresses.get(fam) if addrs is not None: for addr in addrs: - yield(addr) + yield addr try: (host, aliases, _) = socket.gethostbyaddr(addr) except: