From a60a5288d8c2f8ba226e1a65adbe119b04a9200d Mon Sep 17 00:00:00 2001 From: Alan Mologorsky <89034356+mariadb-AlanMologorsky@users.noreply.github.com> Date: Wed, 12 Mar 2025 14:21:32 +0300 Subject: [PATCH] feat(cmapi): MCOL-5133: Stage1 to stand alone cli tool. (#3378) [add] cluster api client class [fix] cli cluster_app using cluster api client [add] ClusterAction Enum [add] toggle_cluster_state function to reduce code duplication --- cmapi/cmapi_server/constants.py | 7 + cmapi/cmapi_server/controllers/api_clients.py | 126 +++++++++++++++ cmapi/cmapi_server/controllers/endpoints.py | 6 +- cmapi/cmapi_server/handlers/cluster.py | 150 ++++++++---------- cmapi/cmapi_server/helpers.py | 1 + cmapi/cmapi_server/managers/transaction.py | 8 +- cmapi/mcs_cluster_tool/cluster_app.py | 37 +++-- 7 files changed, 226 insertions(+), 109 deletions(-) create mode 100644 cmapi/cmapi_server/controllers/api_clients.py diff --git a/cmapi/cmapi_server/constants.py b/cmapi/cmapi_server/constants.py index a1e4142b9..9fba94305 100644 --- a/cmapi/cmapi_server/constants.py +++ b/cmapi/cmapi_server/constants.py @@ -82,3 +82,10 @@ MCS_INSTALL_BIN = '/usr/bin' IFLAG = os.path.join(MCS_ETC_PATH, 'container-initialized') LIBJEMALLOC_DEFAULT_PATH = os.path.join(MCS_DATA_PATH, 'libjemalloc.so.2') MCS_LOG_PATH = '/var/log/mariadb/columnstore' + + +# client constants +CMAPI_PORT = 8640 #TODO: use it in all places +CURRENT_NODE_CMAPI_URL = f'https://localhost:{CMAPI_PORT}' +REQUEST_TIMEOUT: float = 30.0 +TRANSACTION_TIMEOUT: float = 300.0 # 5 minutes \ No newline at end of file diff --git a/cmapi/cmapi_server/controllers/api_clients.py b/cmapi/cmapi_server/controllers/api_clients.py new file mode 100644 index 000000000..424d9d99c --- /dev/null +++ b/cmapi/cmapi_server/controllers/api_clients.py @@ -0,0 +1,126 @@ +import requests +from typing import Any, Dict, Optional, Union +from cmapi_server.controllers.dispatcher import _version +from cmapi_server.constants import CURRENT_NODE_CMAPI_URL + + +class ClusterControllerClient: + def __init__( + self, base_url: str = CURRENT_NODE_CMAPI_URL, + request_timeout: Optional[float] = None + ): + """Initialize the ClusterControllerClient with the base URL. + + :param base_url: The base URL for the API endpoints, + defaults to CURRENT_NODE_CMAPI_URL + :type base_url: str, optional + :param request_timeout: request timeout, defaults to None + :type request_timeout: Optional[float], optional + """ + self.base_url = base_url + self.request_timeout = request_timeout + + def start_cluster( + self, data: Optional[Dict[str, Any]] = None + ) -> Union[Dict[str, Any], Dict[str, str]]: + """Start the cluster. + + :return: The response from the API. + """ + return self._request('PUT', 'start', data) + + def shutdown_cluster( + self, data: Optional[Dict[str, Any]] = None + ) -> Union[Dict[str, Any], Dict[str, str]]: + """Shutdown the cluster. + + :return: The response from the API. + """ + return self._request('PUT', 'shutdown', data) + + def set_mode(self, mode: str) -> Union[Dict[str, Any], Dict[str, str]]: + """Set the cluster mode. + + :param mode: The mode to set. + :return: The response from the API. + """ + return self._request('PUT', 'mode-set', {'mode': mode}) + + def add_node( + self, node_info: Dict[str, Any] + ) -> Union[Dict[str, Any], Dict[str, str]]: + """Add a node to the cluster. + + :param node_info: Information about the node to add. + :return: The response from the API. + """ + return self._request('PUT', 'node', node_info) + + def remove_node( + self, node_id: str + ) -> Union[Dict[str, Any], Dict[str, str]]: + """Remove a node from the cluster. + + :param node_id: The ID of the node to remove. + :return: The response from the API. + """ + return self._request('DELETE', 'node', {'node_id': node_id}) + + def get_status(self) -> Union[Dict[str, Any], Dict[str, str]]: + """Get the status of the cluster. + + :return: The response from the API. + """ + return self._request('GET', 'status') + + def set_api_key( + self, api_key: str + ) -> Union[Dict[str, Any], Dict[str, str]]: + """Set the API key for the cluster. + + :param api_key: The API key to set. + :return: The response from the API. + """ + return self._request('put', 'apikey-set', {'api_key': api_key}) + + def set_log_level( + self, log_level: str + ) -> Union[Dict[str, Any], Dict[str, str]]: + """Set the log level for the cluster. + + :param log_level: The log level to set. + :return: The response from the API. + """ + return self._request('put', 'log-level', {'log_level': log_level}) + + def load_s3data( + self, s3data_info: Dict[str, Any] + ) -> Union[Dict[str, Any], Dict[str, str]]: + """Load S3 data into the cluster. + + :param s3data_info: Information about the S3 data to load. + :return: The response from the API. + """ + return self._request('put', 'load_s3data', s3data_info) + + def _request( + self, method: str, endpoint: str, + data: Optional[Dict[str, Any]] = None + ) -> Union[Dict[str, Any], Dict[str, str]]: + """Make a request to the API. + + :param method: The HTTP method to use. + :param endpoint: The API endpoint to call. + :param data: The data to send with the request. + :return: The response from the API. + """ + url = f'{self.base_url}/cmapi/{_version}/cluster/{endpoint}' + try: + response = requests.request( + method, url, json=data, timeout=self.request_timeout + ) + response.raise_for_status() + return response.json() + # TODO: different handler for timeout exception? + except requests.exceptions.RequestException as e: + return {'error': str(e)} diff --git a/cmapi/cmapi_server/controllers/endpoints.py b/cmapi/cmapi_server/controllers/endpoints.py index a1b8792cb..755a1e0a6 100644 --- a/cmapi/cmapi_server/controllers/endpoints.py +++ b/cmapi/cmapi_server/controllers/endpoints.py @@ -797,9 +797,10 @@ class ClusterController: request = cherrypy.request request_body = request.json config = request_body.get('config', DEFAULT_MCS_CONF_PATH) + in_transaction = request_body.get('in_transaction', False) try: - response = ClusterHandler.start(config) + response = ClusterHandler.start(config, in_transaction) except CMAPIBasicError as err: raise_422_error(module_logger, func_name, err.message) @@ -817,9 +818,10 @@ class ClusterController: request = cherrypy.request request_body = request.json config = request_body.get('config', DEFAULT_MCS_CONF_PATH) + in_transaction = request_body.get('in_transaction', False) try: - response = ClusterHandler.shutdown(config) + response = ClusterHandler.shutdown(config, in_transaction) except CMAPIBasicError as err: raise_422_error(module_logger, func_name, err.message) diff --git a/cmapi/cmapi_server/handlers/cluster.py b/cmapi/cmapi_server/handlers/cluster.py index 628c3da6b..1e8192a36 100644 --- a/cmapi/cmapi_server/handlers/cluster.py +++ b/cmapi/cmapi_server/handlers/cluster.py @@ -1,6 +1,7 @@ """Module contains Cluster business logic functions.""" import logging from datetime import datetime +from enum import Enum from typing import Optional import requests @@ -22,26 +23,61 @@ from mcs_node_control.models.misc import get_dbrm_master from mcs_node_control.models.node_config import NodeConfig +class ClusterAction(Enum): + START = 'start' + STOP = 'stop' + + +def toggle_cluster_state(action: ClusterAction, config: str) -> dict: + """ + Toggle the state of the cluster (start or stop). + + :param action: The cluster action to perform. + (ClusterAction.START or ClusterAction.STOP). + :type action: ClusterAction + :param config: The path to the MariaDB Columnstore configuration file. + :type config: str + """ + if action == ClusterAction.START: + maintainance_flag = True + elif action == ClusterAction.STOP: + maintainance_flag = True + else: + raise ValueError( + 'Invalid action. Use ClusterAction.START or ClusterAction.STOP.' + ) + + switch_node_maintenance(maintainance_flag) + update_revision_and_manager() + + # TODO: move this from multiple places to one + try: + broadcast_successful = broadcast_new_config(config) + except Exception as err: + raise CMAPIBasicError( + 'Error while distributing config file.' + ) from err + + if not broadcast_successful: + raise CMAPIBasicError('Config distribution isn\'t successful.') + + class ClusterHandler(): """Class for handling MCS Cluster operations.""" @staticmethod - def status( - config: str = DEFAULT_MCS_CONF_PATH, - logger: logging.Logger = logging.getLogger('cmapi_server') - ) -> dict: + def status(config: str = DEFAULT_MCS_CONF_PATH) -> dict: """Method to get MCS Cluster status information :param config: columnstore xml config file path, defaults to DEFAULT_MCS_CONF_PATH :type config: str, optional - :param logger: logger, defaults to logging.getLogger('cmapi_server') - :type logger: logging.Logger, optional :raises CMAPIBasicError: if catch some exception while getting status from each node separately :return: status result :rtype: dict """ + logger: logging.Logger = logging.getLogger('cmapi_server') logger.debug('Cluster status command called. Getting status.') response = {'timestamp': str(datetime.now())} @@ -73,78 +109,36 @@ class ClusterHandler(): @staticmethod def start( - config: str = DEFAULT_MCS_CONF_PATH, - logger: logging.Logger = logging.getLogger('cmapi_server') + config: str = DEFAULT_MCS_CONF_PATH, in_transaction: bool = False ) -> dict: """Method to start MCS Cluster. :param config: columnstore xml config file path, defaults to DEFAULT_MCS_CONF_PATH :type config: str, optional - :param logger: logger, defaults to logging.getLogger('cmapi_server') - :type logger: logging.Logger, optional - :raises CMAPIBasicError: on exception while starting transaction - :raises CMAPIBasicError: if transaction start isn't successful + :param in_transaction: is function called in existing transaction or no + If we started transaction in cli tool than we + don't need to handle it here again + :type in_transaction: bool :raises CMAPIBasicError: if no nodes in the cluster - :raises CMAPIBasicError: on exception while distributing new config - :raises CMAPIBasicError: on unsuccessful distibuting config file - :raises CMAPIBasicError: on exception while committing transaction :return: start timestamp :rtype: dict """ - logger.debug('Cluster start command called. Starting the cluster.') - start_time = str(datetime.now()) - transaction_id = get_id() + logger: logging.Logger = logging.getLogger('cmapi_server') + logger.info('Cluster start command called. Starting the cluster.') + operation_start_time = str(datetime.now()) + if not in_transaction: + with TransactionManager(): + toggle_cluster_state(ClusterAction.START, config) + else: + toggle_cluster_state(ClusterAction.START, config) - try: - suceeded, transaction_id, successes = start_transaction( - cs_config_filename=config, txn_id=transaction_id - ) - except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError( - 'Error while starting the transaction.' - ) from err - if not suceeded: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError('Starting transaction isn\'t successful.') - - if suceeded and len(successes) == 0: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError('There are no nodes in the cluster.') - - switch_node_maintenance(False) - update_revision_and_manager() - - # TODO: move this from multiple places to one, eg to helpers - try: - broadcast_successful = broadcast_new_config(config) - except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError( - 'Error while distributing config file.' - ) from err - - if not broadcast_successful: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError('Config distribution isn\'t successful.') - - try: - commit_transaction(transaction_id, cs_config_filename=config) - except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError( - 'Error while committing transaction.' - ) from err - - logger.debug('Successfully finished cluster start.') - return {'timestamp': start_time} + logger.info('Successfully finished cluster start.') + return {'timestamp': operation_start_time} @staticmethod def shutdown( - config: str = DEFAULT_MCS_CONF_PATH, - logger: logging.Logger = logging.getLogger('cmapi_server'), - in_transaction: bool = False, + config: str = DEFAULT_MCS_CONF_PATH, in_transaction: bool = False, timeout: int = 15 ) -> dict: """Method to stop the MCS Cluster. @@ -152,8 +146,6 @@ class ClusterHandler(): :param config: columnstore xml config file path, defaults to DEFAULT_MCS_CONF_PATH :type config: str, optional - :param logger: logger, defaults to logging.getLogger('cmapi_server') - :type logger: logging.Logger, optional :param in_transaction: is function called in existing transaction or no :type in_transaction: bool :param timeout: timeout in seconds to gracefully stop DMLProc @@ -163,35 +155,19 @@ class ClusterHandler(): :return: start timestamp :rtype: dict """ + logger: logging.Logger = logging.getLogger('cmapi_server') logger.debug( 'Cluster shutdown command called. Shutting down the cluster.' ) - - def process_shutdown(): - """Raw node shutdown processing.""" - switch_node_maintenance(True) - update_revision_and_manager() - - # TODO: move this from multiple places to one, eg to helpers - try: - broadcast_successful = broadcast_new_config(config) - except Exception as err: - raise CMAPIBasicError( - 'Error while distributing config file.' - ) from err - - if not broadcast_successful: - raise CMAPIBasicError('Config distribution isn\'t successful.') - - start_time = str(datetime.now()) + operation_start_time = str(datetime.now()) if not in_transaction: with TransactionManager(): - process_shutdown() + toggle_cluster_state(ClusterAction.STOP, config) else: - process_shutdown() + toggle_cluster_state(ClusterAction.STOP, config) logger.debug('Successfully finished shutting down the cluster.') - return {'timestamp': start_time} + return {'timestamp': operation_start_time} @staticmethod def add_node( diff --git a/cmapi/cmapi_server/helpers.py b/cmapi/cmapi_server/helpers.py index 1cf38ef42..4b9def2c4 100644 --- a/cmapi/cmapi_server/helpers.py +++ b/cmapi/cmapi_server/helpers.py @@ -314,6 +314,7 @@ def broadcast_new_config( :rtype: bool """ + # TODO: move this from multiple places to one, eg to helpers cfg_parser = get_config_parser(cmapi_config_filename) key = get_current_key(cfg_parser) version = get_version() diff --git a/cmapi/cmapi_server/managers/transaction.py b/cmapi/cmapi_server/managers/transaction.py index 10db998df..cffb71386 100644 --- a/cmapi/cmapi_server/managers/transaction.py +++ b/cmapi/cmapi_server/managers/transaction.py @@ -6,7 +6,7 @@ from signal import ( ) from typing import Optional, Type -from cmapi_server.constants import DEFAULT_MCS_CONF_PATH +from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, TRANSACTION_TIMEOUT from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.helpers import ( get_id, commit_transaction, rollback_transaction, start_transaction @@ -17,7 +17,7 @@ class TransactionManager(ContextDecorator): """Context manager and decorator to put any code inside CMAPI transaction. :param timeout: time in sec after transaction will be autocommitted, - defaults to 300.0 + defaults to 300.0 (TRANSACTION_TIMEOUT) :param timeout: _description_, defaults to 300 :type timeout: float, optional @@ -28,8 +28,8 @@ class TransactionManager(ContextDecorator): """ def __init__( - self, timeout: float = 300, txn_id: Optional[int] = None, - handle_signals: bool = False + self, timeout: float = TRANSACTION_TIMEOUT, + txn_id: Optional[int] = None, handle_signals: bool = False ): self.timeout = timeout self.txn_id = txn_id or get_id() diff --git a/cmapi/mcs_cluster_tool/cluster_app.py b/cmapi/mcs_cluster_tool/cluster_app.py index 67602abe9..f911057f2 100644 --- a/cmapi/mcs_cluster_tool/cluster_app.py +++ b/cmapi/mcs_cluster_tool/cluster_app.py @@ -13,16 +13,16 @@ import typer from typing_extensions import Annotated from cmapi_server.constants import ( - CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH, SECRET_KEY + CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH, REQUEST_TIMEOUT ) from cmapi_server.exceptions import CMAPIBasicError -from cmapi_server.handlers.cluster import ClusterHandler from cmapi_server.helpers import ( get_config_parser, get_current_key, get_version, build_url ) from cmapi_server.managers.transaction import TransactionManager from mcs_cluster_tool.decorators import handle_output from mcs_node_control.models.node_config import NodeConfig +from cmapi.cmapi_server.controllers.api_clients import ClusterControllerClient logger = logging.getLogger('mcs_cli') @@ -33,13 +33,15 @@ node_app = typer.Typer(help='Cluster nodes management.') app.add_typer(node_app, name='node') set_app = typer.Typer(help='Set cluster parameters.') app.add_typer(set_app, name='set') +client = ClusterControllerClient() @app.command(rich_help_panel='cluster and single node commands') @handle_output def status(): """Get status information.""" - return ClusterHandler.status(logger=logger) + client.request_timeout = REQUEST_TIMEOUT + return client.get_status() @app.command(rich_help_panel='cluster and single node commands') @@ -157,25 +159,29 @@ def stop( # TODO: investigate more on how changing the hardcoded timeout # could affect put_config (helpers.py broadcast_config) operation timeout = 0 - _ = ClusterHandler.shutdown(logger=logger, in_transaction=True) + + resp = client.shutdown_cluster({'in_transaction': True}) return {'timestamp': start_time} @app.command(rich_help_panel='cluster and single node commands') @handle_output +@TransactionManager( + timeout=timedelta(days=1).total_seconds(), handle_signals=True +) def start(): """Start the Columnstore cluster.""" - return ClusterHandler.start(logger=logger) + return client.start_cluster({'in_transaction': True}) @app.command(rich_help_panel='cluster and single node commands') @handle_output def restart(): """Restart the Columnstore cluster.""" - stop_result = ClusterHandler.shutdown(logger=logger) + stop_result = client.shutdown_cluster() if 'error' in stop_result: return stop_result - result = ClusterHandler.start(logger=logger) + result = client.start_cluster() result['stop_timestamp'] = stop_result['timestamp'] return result @@ -195,7 +201,7 @@ def add( """Add nodes to the Columnstore cluster.""" result = [] for node in nodes: - result.append(ClusterHandler.add_node(node, logger=logger)) + result.append(client.add_node({'node': node})) return result @@ -213,7 +219,7 @@ def remove(nodes: Optional[List[str]] = typer.Option( """Remove nodes from the Columnstore cluster.""" result = [] for node in nodes: - result.append(ClusterHandler.remove_node(node, logger=logger)) + result.append(client.remove_node(node)) return result @@ -233,7 +239,8 @@ def mode(cluster_mode: str = typer.Option( raise typer.BadParameter( '"readonly" or "readwrite" are the only acceptable modes now.' ) - return ClusterHandler.set_mode(cluster_mode, logger=logger) + client.request_timeout = REQUEST_TIMEOUT + return client.set_mode(cluster_mode) @set_app.command() @@ -245,10 +252,8 @@ def api_key(key: str = typer.Option(..., help='API key to set.')): """ if not key: raise typer.BadParameter('Empty API key not allowed.') - - totp = pyotp.TOTP(SECRET_KEY) - - return ClusterHandler.set_api_key(key, totp.now(), logger=logger) + client.request_timeout = REQUEST_TIMEOUT + return client.set_api_key(key) @set_app.command() @@ -260,5 +265,5 @@ def log_level(level: str = typer.Option(..., help='Logging level to set.')): """ if not level: raise typer.BadParameter('Empty log level not allowed.') - - return ClusterHandler.set_log_level(level, logger=logger) + client.request_timeout = REQUEST_TIMEOUT + return client.set_log_level(level)