From af10cb15262d9fa0970f78cddd16ffdf3eba65cb Mon Sep 17 00:00:00 2001 From: mariadb-AlanMologorsky Date: Fri, 24 Jan 2025 15:54:59 +0300 Subject: [PATCH] feat(cmapi): MCOL-5133: Stage2 stand alone cli tool. [fix] client class methods [fix] Transaction management at endpoint handling level [fix] add_node and set_mode methods to use TransactionManager --- cmapi/cmapi_server/controllers/api_clients.py | 20 ++-- cmapi/cmapi_server/controllers/endpoints.py | 32 +++++-- cmapi/cmapi_server/handlers/cluster.py | 93 ++----------------- cmapi/mcs_cluster_tool/cluster_app.py | 13 ++- 4 files changed, 58 insertions(+), 100 deletions(-) diff --git a/cmapi/cmapi_server/controllers/api_clients.py b/cmapi/cmapi_server/controllers/api_clients.py index 424d9d99c..c03d0b263 100644 --- a/cmapi/cmapi_server/controllers/api_clients.py +++ b/cmapi/cmapi_server/controllers/api_clients.py @@ -21,43 +21,45 @@ class ClusterControllerClient: self.request_timeout = request_timeout def start_cluster( - self, data: Optional[Dict[str, Any]] = None + self, extra: Dict[str, Any] = dict() ) -> Union[Dict[str, Any], Dict[str, str]]: """Start the cluster. :return: The response from the API. """ - return self._request('PUT', 'start', data) + return self._request('PUT', 'start', extra) def shutdown_cluster( - self, data: Optional[Dict[str, Any]] = None + self, extra: Dict[str, Any] = dict() ) -> Union[Dict[str, Any], Dict[str, str]]: """Shutdown the cluster. :return: The response from the API. """ - return self._request('PUT', 'shutdown', data) + return self._request('PUT', 'shutdown', extra) - def set_mode(self, mode: str) -> Union[Dict[str, Any], Dict[str, str]]: + def set_mode( + self, mode: str, extra: Dict[str, Any] = dict() + ) -> Union[Dict[str, Any], Dict[str, str]]: """Set the cluster mode. :param mode: The mode to set. :return: The response from the API. """ - return self._request('PUT', 'mode-set', {'mode': mode}) + return self._request('PUT', 'mode-set', {'mode': mode, **extra}) def add_node( - self, node_info: Dict[str, Any] + self, node_info: Dict[str, Any], extra: Dict[str, Any] = dict() ) -> Union[Dict[str, Any], Dict[str, str]]: """Add a node to the cluster. :param node_info: Information about the node to add. :return: The response from the API. """ - return self._request('PUT', 'node', node_info) + return self._request('PUT', 'node', {**node_info, **extra}) def remove_node( - self, node_id: str + self, node_id: str, extra: Dict[str, Any] = dict() ) -> Union[Dict[str, Any], Dict[str, str]]: """Remove a node from the cluster. diff --git a/cmapi/cmapi_server/controllers/endpoints.py b/cmapi/cmapi_server/controllers/endpoints.py index 6f48d8f3d..53adcfc9c 100644 --- a/cmapi/cmapi_server/controllers/endpoints.py +++ b/cmapi/cmapi_server/controllers/endpoints.py @@ -25,8 +25,9 @@ from cmapi_server.helpers import ( system_ready, save_cmapi_conf_file, dequote, in_maintenance_state, ) from cmapi_server.logging_management import change_loggers_level -from cmapi_server.managers.process import MCSProcessManager from cmapi_server.managers.application import AppManager +from cmapi_server.managers.process import MCSProcessManager +from cmapi_server.managers.transaction import TransactionManager from cmapi_server.node_manipulation import is_master, switch_node_maintenance from mcs_node_control.models.dbrm import set_cluster_mode from mcs_node_control.models.node_config import NodeConfig @@ -800,7 +801,11 @@ class ClusterController: in_transaction = request_body.get('in_transaction', False) try: - response = ClusterHandler.start(config, in_transaction) + if not in_transaction: + with TransactionManager(): + response = ClusterHandler.start(config) + else: + response = ClusterHandler.start(config) except CMAPIBasicError as err: raise_422_error(module_logger, func_name, err.message) @@ -821,7 +826,11 @@ class ClusterController: in_transaction = request_body.get('in_transaction', False) try: - response = ClusterHandler.shutdown(config, in_transaction) + if not in_transaction: + with TransactionManager(): + response = ClusterHandler.shutdown(config) + else: + response = ClusterHandler.shutdown(config) except CMAPIBasicError as err: raise_422_error(module_logger, func_name, err.message) @@ -840,9 +849,14 @@ class ClusterController: request_body = request.json mode = request_body.get('mode', 'readonly') config = request_body.get('config', DEFAULT_MCS_CONF_PATH) + in_transaction = request_body.get('in_transaction', False) try: - response = ClusterHandler.set_mode(mode, config=config) + if not in_transaction: + with TransactionManager(): + response = ClusterHandler.set_mode(mode, config=config) + else: + response = ClusterHandler.set_mode(mode, config=config) except CMAPIBasicError as err: raise_422_error(module_logger, func_name, err.message) @@ -861,12 +875,17 @@ class ClusterController: request_body = request.json node = request_body.get('node', None) config = request_body.get('config', DEFAULT_MCS_CONF_PATH) + in_transaction = request_body.get('in_transaction', False) if node is None: raise_422_error(module_logger, func_name, 'missing node argument') try: - response = ClusterHandler.add_node(node, config) + if not in_transaction: + with TransactionManager(): + response = ClusterHandler.add_node(node, config) + else: + response = ClusterHandler.add_node(node, config) except CMAPIBasicError as err: raise_422_error(module_logger, func_name, err.message) @@ -884,7 +903,8 @@ class ClusterController: request_body = request.json node = request_body.get('node', None) config = request_body.get('config', DEFAULT_MCS_CONF_PATH) - response = {'timestamp': str(datetime.now())} + #TODO: for next release + in_transaction = request_body.get('in_transaction', False) #TODO: add arguments verification decorator if node is None: diff --git a/cmapi/cmapi_server/handlers/cluster.py b/cmapi/cmapi_server/handlers/cluster.py index 1e8192a36..88c381d07 100644 --- a/cmapi/cmapi_server/handlers/cluster.py +++ b/cmapi/cmapi_server/handlers/cluster.py @@ -108,18 +108,12 @@ class ClusterHandler(): return response @staticmethod - def start( - config: str = DEFAULT_MCS_CONF_PATH, in_transaction: bool = False - ) -> dict: + def start(config: str = DEFAULT_MCS_CONF_PATH) -> dict: """Method to start MCS Cluster. :param config: columnstore xml config file path, defaults to DEFAULT_MCS_CONF_PATH :type config: str, optional - :param in_transaction: is function called in existing transaction or no - If we started transaction in cli tool than we - don't need to handle it here again - :type in_transaction: bool :raises CMAPIBasicError: if no nodes in the cluster :return: start timestamp :rtype: dict @@ -127,27 +121,19 @@ class ClusterHandler(): logger: logging.Logger = logging.getLogger('cmapi_server') logger.info('Cluster start command called. Starting the cluster.') operation_start_time = str(datetime.now()) - if not in_transaction: - with TransactionManager(): - toggle_cluster_state(ClusterAction.START, config) - else: - toggle_cluster_state(ClusterAction.START, config) - + toggle_cluster_state(ClusterAction.START, config) logger.info('Successfully finished cluster start.') return {'timestamp': operation_start_time} @staticmethod def shutdown( - config: str = DEFAULT_MCS_CONF_PATH, in_transaction: bool = False, - timeout: int = 15 + config: str = DEFAULT_MCS_CONF_PATH, timeout: int = 15 ) -> dict: """Method to stop the MCS Cluster. :param config: columnstore xml config file path, defaults to DEFAULT_MCS_CONF_PATH :type config: str, optional - :param in_transaction: is function called in existing transaction or no - :type in_transaction: bool :param timeout: timeout in seconds to gracefully stop DMLProc TODO: for next releases :type timeout: int @@ -160,20 +146,12 @@ class ClusterHandler(): 'Cluster shutdown command called. Shutting down the cluster.' ) operation_start_time = str(datetime.now()) - if not in_transaction: - with TransactionManager(): - toggle_cluster_state(ClusterAction.STOP, config) - else: - toggle_cluster_state(ClusterAction.STOP, config) - + toggle_cluster_state(ClusterAction.STOP, config) logger.debug('Successfully finished shutting down the cluster.') return {'timestamp': operation_start_time} @staticmethod - def add_node( - node: str, config: str = DEFAULT_MCS_CONF_PATH, - logger: logging.Logger = logging.getLogger('cmapi_server') - ) -> dict: + def add_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict: """Method to add node to MCS CLuster. :param node: node IP or name or FQDN @@ -181,8 +159,6 @@ class ClusterHandler(): :param config: columnstore xml config file path, defaults to DEFAULT_MCS_CONF_PATH :type config: str, optional - :param logger: logger, defaults to logging.getLogger('cmapi_server') - :type logger: logging.Logger, optional :raises CMAPIBasicError: on exception while starting transaction :raises CMAPIBasicError: if transaction start isn't successful :raises CMAPIBasicError: on exception while adding node @@ -192,24 +168,10 @@ class ClusterHandler(): :return: result of adding node :rtype: dict """ + logger: logging.Logger = logging.getLogger('cmapi_server') logger.debug(f'Cluster add node command called. Adding node {node}.') response = {'timestamp': str(datetime.now())} - transaction_id = get_id() - - try: - suceeded, transaction_id, successes = start_transaction( - cs_config_filename=config, extra_nodes=[node], - txn_id=transaction_id - ) - except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError( - 'Error while starting the transaction.' - ) from err - if not suceeded: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError('Starting transaction isn\'t successful.') try: add_node( @@ -222,7 +184,6 @@ class ClusterHandler(): output_config_filename=config ) except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) raise CMAPIBasicError('Error while adding node.') from err response['node_id'] = node @@ -233,31 +194,18 @@ class ClusterHandler(): try: broadcast_successful = broadcast_new_config(config) except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) raise CMAPIBasicError( 'Error while distributing config file.' ) from err if not broadcast_successful: - rollback_transaction(transaction_id, cs_config_filename=config) raise CMAPIBasicError('Config distribution isn\'t successful.') - try: - commit_transaction(transaction_id, cs_config_filename=config) - except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError( - 'Error while committing transaction.' - ) from err - logger.debug(f'Successfully finished adding node {node}.') return response @staticmethod - def remove_node( - node: str, config: str = DEFAULT_MCS_CONF_PATH, - logger: logging.Logger = logging.getLogger('cmapi_server') - ) -> dict: + def remove_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict: """Method to remove node from MCS CLuster. :param node: node IP or name or FQDN @@ -265,8 +213,6 @@ class ClusterHandler(): :param config: columnstore xml config file path, defaults to DEFAULT_MCS_CONF_PATH :type config: str, optional - :param logger: logger, defaults to logging.getLogger('cmapi_server') - :type logger: logging.Logger, optional :raises CMAPIBasicError: on exception while starting transaction :raises CMAPIBasicError: if transaction start isn't successful :raises CMAPIBasicError: on exception while removing node @@ -276,6 +222,9 @@ class ClusterHandler(): :return: result of node removing :rtype: dict """ + #TODO: This method will be moved to transaction manager in next release + # Due to specific use of txn_nodes inside. + logger: logging.Logger = logging.getLogger('cmapi_server') logger.debug( f'Cluster remove node command called. Removing node {node}.' ) @@ -387,19 +336,6 @@ class ClusterHandler(): payload = {'cluster_mode': mode} url = f'https://{master}:8640/cmapi/{get_version()}/node/config' - try: - suceeded, transaction_id, successes = start_transaction( - cs_config_filename=config, txn_id=transaction_id - ) - except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError( - 'Error while starting the transaction.' - ) from err - if not suceeded: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError('Starting transaction isn\'t successful.') - nc = NodeConfig() root = nc.get_current_config_root(config_filename=config) payload['manager'] = root.find('./ClusterManager').text @@ -412,19 +348,10 @@ class ClusterHandler(): r.raise_for_status() response['cluster-mode'] = mode except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) raise CMAPIBasicError( f'Error while setting cluster mode to {mode}' ) from err - try: - commit_transaction(transaction_id, cs_config_filename=config) - except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError( - 'Error while committing transaction.' - ) from err - logger.debug(f'Successfully set cluster mode to {mode}.') return response diff --git a/cmapi/mcs_cluster_tool/cluster_app.py b/cmapi/mcs_cluster_tool/cluster_app.py index f911057f2..0b23ff021 100644 --- a/cmapi/mcs_cluster_tool/cluster_app.py +++ b/cmapi/mcs_cluster_tool/cluster_app.py @@ -176,18 +176,24 @@ def start(): @app.command(rich_help_panel='cluster and single node commands') @handle_output +@TransactionManager( + timeout=timedelta(days=1).total_seconds(), handle_signals=True +) def restart(): """Restart the Columnstore cluster.""" - stop_result = client.shutdown_cluster() + stop_result = client.shutdown_cluster({'in_transaction': True}) if 'error' in stop_result: return stop_result - result = client.start_cluster() + result = client.start_cluster({'in_transaction': True}) result['stop_timestamp'] = stop_result['timestamp'] return result @node_app.command(rich_help_panel='cluster node commands') @handle_output +@TransactionManager( + timeout=timedelta(days=1).total_seconds(), handle_signals=True +) def add( nodes: Optional[List[str]] = typer.Option( ..., @@ -225,6 +231,9 @@ def remove(nodes: Optional[List[str]] = typer.Option( @set_app.command() @handle_output +@TransactionManager( + timeout=timedelta(days=1).total_seconds(), handle_signals=True +) def mode(cluster_mode: str = typer.Option( ..., '--mode',