diff --git a/cmapi/cmapi_server/controllers/dispatcher.py b/cmapi/cmapi_server/controllers/dispatcher.py index 76da33f2b..3d1aaa0d9 100644 --- a/cmapi/cmapi_server/controllers/dispatcher.py +++ b/cmapi/cmapi_server/controllers/dispatcher.py @@ -6,7 +6,7 @@ from cmapi_server.controllers.endpoints import ( StatusController, ConfigController, BeginController, CommitController, RollbackController, StartController, ShutdownController, ExtentMapController, ClusterController, ApiKeyController, - LoggingConfigController, AppController + LoggingConfigController, AppController, NodeProcessController ) from cmapi_server.controllers.s3dataload import S3DataLoadController @@ -241,6 +241,26 @@ dispatcher.connect( ) +# /_version/node/stop_dmlproc/ (PUT) +dispatcher.connect( + name = 'stop_dmlproc', + route = f'/cmapi/{_version}/node/stop_dmlproc', + action = 'put_stop_dmlproc', + controller = NodeProcessController(), + conditions = {'method': ['PUT']} +) + + +# /_version/node/is_process_running/ (PUT) +dispatcher.connect( + name = 'is_process_running', + route = f'/cmapi/{_version}/node/is_process_running', + action = 'get_process_running', + controller = NodeProcessController(), + conditions = {'method': ['GET']} +) + + def jsonify_error(status, message, traceback, version): \ # pylint: disable=unused-argument """JSONify all CherryPy error responses (created by raising the diff --git a/cmapi/cmapi_server/controllers/endpoints.py b/cmapi/cmapi_server/controllers/endpoints.py index 0f190ba9f..bf3b009c1 100644 --- a/cmapi/cmapi_server/controllers/endpoints.py +++ b/cmapi/cmapi_server/controllers/endpoints.py @@ -63,6 +63,23 @@ def raise_422_error( raise APIError(422, err_msg) +# TODO: Move somwhere else, eg. to helpers +def get_use_sudo(app_config: dict) -> bool: + """Get value about using superuser or not from app config. + + :param app_config: CherryPy application config + :type app_config: dict + :return: use_sudo config value + :rtype: bool + """ + privileges_section = app_config.get('Privileges', None) + if privileges_section is not None: + use_sudo = privileges_section.get('use_sudo', False) + else: + use_sudo = False + return use_sudo + + @cherrypy.tools.register('before_handler', priority=80) def validate_api_key(): """Validate API key. @@ -513,6 +530,7 @@ IP address.") module_logger.debug(f'{func_name} returns {str(begin_response)}') return begin_response + class CommitController: @cherrypy.tools.timeit() @cherrypy.tools.json_in() @@ -601,15 +619,6 @@ class RollbackController: return rollback_response -def get_use_sudo(app_config): - privileges_section = app_config.get('Privileges', None) - if privileges_section is not None: - use_sudo = privileges_section.get('use_sudo', False) - else: - use_sudo = False - return use_sudo - - class StartController: @cherrypy.tools.timeit() @cherrypy.tools.json_out() @@ -1137,3 +1146,59 @@ class AppController(): return {'started': True} else: raise APIError(503, 'CMAPI not ready to handle requests.') + + +class NodeProcessController(): + + @cherrypy.tools.timeit() + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.validate_api_key() # pylint: disable=no-member + def put_stop_dmlproc(self): + """Handler for /node/stop_dmlproc (PUT) endpoint.""" + # TODO: make it works only from cli tool like set_api_key made + func_name = 'put_stop_dmlproc' + log_begin(module_logger, func_name) + + request = cherrypy.request + request_body = request.json + timeout = request_body.get('timeout', 10) + force = request_body.get('force', False) + + if force: + module_logger.debug( + f'Calling DMLproc to force stop after timeout={timeout}.' + ) + MCSProcessManager.stop( + name='DMLProc', is_primary=True, use_sudo=True, timeout=timeout + ) + else: + module_logger.debug('Callling stop DMLproc gracefully.') + try: + MCSProcessManager.gracefully_stop_dmlproc() + except (ConnectionRefusedError, RuntimeError): + raise_422_error( + logger=module_logger, func_name=func_name, + err_msg='Couldn\'t stop DMlproc gracefully' + ) + response = {'timestamp': str(datetime.now())} + module_logger.debug(f'{func_name} returns {str(response)}') + return response + + @cherrypy.tools.timeit() + @cherrypy.tools.json_out() + @cherrypy.tools.validate_api_key() # pylint: disable=no-member + def get_process_running(self, process_name): + """Handler for /node/is_process_running (GET) endpoint.""" + func_name = 'get_process_running' + log_begin(module_logger, func_name) + + process_running = MCSProcessManager.is_service_running(process_name) + + response = { + 'timestamp': str(datetime.now()), + 'process_name': process_name, + 'running': process_running + } + module_logger.debug(f'{func_name} returns {str(response)}') + return response diff --git a/cmapi/cmapi_server/handlers/cluster.py b/cmapi/cmapi_server/handlers/cluster.py index f8988ad9a..628c3da6b 100644 --- a/cmapi/cmapi_server/handlers/cluster.py +++ b/cmapi/cmapi_server/handlers/cluster.py @@ -1,6 +1,7 @@ """Module contains Cluster business logic functions.""" import logging from datetime import datetime +from typing import Optional import requests @@ -13,6 +14,7 @@ from cmapi_server.helpers import ( get_config_parser, get_current_key, get_id, get_version, start_transaction, rollback_transaction, update_revision_and_manager, ) +from cmapi_server.managers.transaction import TransactionManager from cmapi_server.node_manipulation import ( add_node, add_dbroot, remove_node, switch_node_maintenance, ) @@ -96,7 +98,7 @@ class ClusterHandler(): try: suceeded, transaction_id, successes = start_transaction( - cs_config_filename=config, id=transaction_id + cs_config_filename=config, txn_id=transaction_id ) except Exception as err: rollback_transaction(transaction_id, cs_config_filename=config) @@ -141,7 +143,9 @@ class ClusterHandler(): @staticmethod def shutdown( config: str = DEFAULT_MCS_CONF_PATH, - logger: logging.Logger = logging.getLogger('cmapi_server') + logger: logging.Logger = logging.getLogger('cmapi_server'), + in_transaction: bool = False, + timeout: int = 15 ) -> dict: """Method to stop the MCS Cluster. @@ -150,6 +154,11 @@ class ClusterHandler(): :type config: str, optional :param logger: logger, defaults to logging.getLogger('cmapi_server') :type logger: logging.Logger, optional + :param in_transaction: is function called in existing transaction or no + :type in_transaction: bool + :param timeout: timeout in seconds to gracefully stop DMLProc + TODO: for next releases + :type timeout: int :raises CMAPIBasicError: if no nodes in the cluster :return: start timestamp :rtype: dict @@ -158,49 +167,28 @@ class ClusterHandler(): 'Cluster shutdown command called. Shutting down the cluster.' ) + def process_shutdown(): + """Raw node shutdown processing.""" + switch_node_maintenance(True) + update_revision_and_manager() + + # TODO: move this from multiple places to one, eg to helpers + try: + broadcast_successful = broadcast_new_config(config) + except Exception as err: + raise CMAPIBasicError( + 'Error while distributing config file.' + ) from err + + if not broadcast_successful: + raise CMAPIBasicError('Config distribution isn\'t successful.') + start_time = str(datetime.now()) - transaction_id = get_id() - - try: - suceeded, transaction_id, successes = start_transaction( - cs_config_filename=config, id=transaction_id - ) - except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError( - 'Error while starting the transaction.' - ) from err - if not suceeded: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError('Starting transaction isn\'t successful.') - - if suceeded and len(successes) == 0: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError('There are no nodes in the cluster.') - - switch_node_maintenance(True) - update_revision_and_manager() - - # TODO: move this from multiple places to one, eg to helpers - try: - broadcast_successful = broadcast_new_config(config) - except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError( - 'Error while distributing config file.' - ) from err - - if not broadcast_successful: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError('Config distribution isn\'t successful.') - - try: - commit_transaction(transaction_id, cs_config_filename=config) - except Exception as err: - rollback_transaction(transaction_id, cs_config_filename=config) - raise CMAPIBasicError( - 'Error while committing transaction.' - ) from err + if not in_transaction: + with TransactionManager(): + process_shutdown() + else: + process_shutdown() logger.debug('Successfully finished shutting down the cluster.') return {'timestamp': start_time} @@ -236,7 +224,7 @@ class ClusterHandler(): try: suceeded, transaction_id, successes = start_transaction( cs_config_filename=config, extra_nodes=[node], - id=transaction_id + txn_id=transaction_id ) except Exception as err: rollback_transaction(transaction_id, cs_config_filename=config) @@ -321,7 +309,7 @@ class ClusterHandler(): try: suceeded, transaction_id, txn_nodes = start_transaction( cs_config_filename=config, remove_nodes=[node], - id=transaction_id + txn_id=transaction_id ) except Exception as err: rollback_transaction(transaction_id, cs_config_filename=config) @@ -425,7 +413,7 @@ class ClusterHandler(): try: suceeded, transaction_id, successes = start_transaction( - cs_config_filename=config, id=transaction_id + cs_config_filename=config, txn_id=transaction_id ) except Exception as err: rollback_transaction(transaction_id, cs_config_filename=config) diff --git a/cmapi/cmapi_server/helpers.py b/cmapi/cmapi_server/helpers.py index 91045ca73..ea2b61726 100644 --- a/cmapi/cmapi_server/helpers.py +++ b/cmapi/cmapi_server/helpers.py @@ -11,10 +11,12 @@ import logging import os import socket import time +from collections import namedtuple from functools import partial from random import random from shutil import copyfile from typing import Tuple, Optional +from urllib.parse import urlencode, urlunparse import lxml.objectify import requests @@ -32,17 +34,25 @@ from cmapi_server.managers.process import MCSProcessManager from mcs_node_control.models.node_config import NodeConfig -def get_id(): +def get_id() -> int: + """Generate pseudo random id for transaction. + + :return: id for internal transaction + :rtype: int + + ..TODO: need to change transaction id format and generation method? + """ return int(random() * 1000000) def start_transaction( - config_filename=CMAPI_CONF_PATH, - cs_config_filename=DEFAULT_MCS_CONF_PATH, - extra_nodes=None, - remove_nodes=None, - optional_nodes=None, - id=get_id() + config_filename: str = CMAPI_CONF_PATH, + cs_config_filename: str = DEFAULT_MCS_CONF_PATH, + extra_nodes: Optional[list] = None, + remove_nodes: Optional[list] = None, + optional_nodes: Optional[list] = None, + txn_id: Optional[int] = None, + timeout: float = 300.0 ): """Start internal CMAPI transaction. @@ -53,19 +63,26 @@ def start_transaction( :param config_filename: cmapi config filepath, defaults to CMAPI_CONF_PATH - :type config_filename: str + :type config_filename: str, optional :param cs_config_filename: columnstore xml config filepath, defaults to DEFAULT_MCS_CONF_PATH :type cs_config_filename: str, optional :param extra_nodes: extra nodes, defaults to None - :type extra_nodes: list, optional + :type extra_nodes: Optional[list], optional :param remove_nodes: remove nodes, defaults to None - :type remove_nodes: list, optional + :type remove_nodes: Optional[list], optional :param optional_nodes: optional nodes, defaults to None - :type optional_nodes: list, optional - :return: (success, txnid, nodes) - :rtype: tuple + :type optional_nodes: Optional[list], optional + :param txn_id: id for transaction to start, defaults to None + :type txn_id: Optional[int], optional + :param timeout: time in seconds for cmapi transaction lock before it ends + automatically, defaults to 300 + :type timeout: float, optional + :return: (success, txn_id, nodes) + :rtype: tuple[bool, int, list[str]] """ + if txn_id is None: + txn_id = get_id() # TODO: Somehow change that logic for eg using several input types # (str\list\set) and detect which one we got. extra_nodes = extra_nodes or [] @@ -78,8 +95,8 @@ def start_transaction( version = get_version() headers = {'x-api-key': api_key} - body = {'id' : id} - final_time = datetime.datetime.now() + datetime.timedelta(seconds=300) + body = {'id' : txn_id} + final_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout) success = False while datetime.datetime.now() < final_time and not success: @@ -180,7 +197,7 @@ def start_transaction( time.sleep(1) if not node_success and node not in optional_nodes: - rollback_txn_attempt(api_key, version, id, successes) + rollback_txn_attempt(api_key, version, txn_id, successes) # wait up to 5 secs and try the whole thing again time.sleep(random() * 5) break @@ -192,7 +209,7 @@ def start_transaction( # are up (> 50%). success = (len(successes) == len(real_active_nodes)) - return (success, id, successes) + return (success, txn_id, successes) def rollback_txn_attempt(key, version, txnid, nodes): headers = {'x-api-key': key} @@ -273,6 +290,7 @@ def broadcast_new_config( sm_config_filename: str = DEFAULT_SM_CONF_PATH, test_mode: bool = False, nodes: Optional[list] = None, + timeout: int = 10 ) -> bool: """Send new config to nodes. Now in async way. @@ -289,8 +307,11 @@ def broadcast_new_config( :type test_mode: bool, optional :param nodes: nodes list for config put, defaults to None :type nodes: Optional[list], optional + :param timeout: timeout passing to gracefully stop DMLProc TODO: for next + releases. Could affect all logic of broadcacting new config + :type timeout: int :return: success state - :rtype: _type_ + :rtype: bool """ cfg_parser = get_config_parser(cmapi_config_filename) @@ -326,6 +347,11 @@ def broadcast_new_config( async def update_config(node, success_nodes, failed_nodes, headers, body): url = f'https://{node}:8640/cmapi/{version}/node/config' + # TODO: investigate about hardcoded 120 seconds timeout + # Check e1242eed47b61276ebc86136f124f6d974655515 in cmapi old + # repo to get more info. Patric made it because: + # "Made the timeout for a CS process restart 120s, since + # the container dispatcher waits up to 60s for SM to stop" request_put = partial( requests.put, url, verify=False, headers=headers, json=body, timeout=120 @@ -845,3 +871,44 @@ def get_dispatcher_name_and_path( config_parser.get('Dispatcher', 'path', fallback='') ) return dispatcher_name, dispatcher_path + + +def build_url( + base_url: str, query_params: dict, scheme: str = 'https', + path: str = '', params: str = '', fragment: str = '', + port: Optional[int] = None +) -> str: + """Build url with query params. + + :param base_url: base url address + :type base_url: str + :param query_params: query params + :type query_params: dict + :param scheme: url scheme, defaults to 'https' + :type scheme: str, optional + :param path: url path, defaults to '' + :type path: str, optional + :param params: params, defaults to '' + :type params: str, optional + :param fragment: fragment, defaults to '' + :type fragment: str, optional + :param port: port for base url, defaults to None + :type port: Optional[int], optional + :return: url with query params + :rtype: str + """ + # namedtuple to match the internal signature of urlunparse + Components = namedtuple( + typename='Components', + field_names=['scheme', 'netloc', 'path', 'params', 'query', 'fragment'] + ) + return urlunparse( + Components( + scheme=scheme, + netloc=f'{base_url}:{port}' if port else base_url, + path=path, + params=params, + query=urlencode(query_params), + fragment=fragment + ) + ) diff --git a/cmapi/cmapi_server/managers/process.py b/cmapi/cmapi_server/managers/process.py index ef01f57aa..db4742aa5 100644 --- a/cmapi/cmapi_server/managers/process.py +++ b/cmapi/cmapi_server/managers/process.py @@ -262,6 +262,41 @@ class MCSProcessManager: """No operation. TODO: looks like useless.""" cls.process_dispatcher.noop() + @classmethod + def gracefully_stop_dmlproc(cls) -> None: + """Gracefully stop DMLProc using DBRM commands.""" + logging.info( + 'Trying to gracefully stop DMLProc using DBRM commands.' + ) + try: + with DBRM() as dbrm: + dbrm.set_system_state( + ['SS_ROLLBACK', 'SS_SHUTDOWN_PENDING'] + ) + except (ConnectionRefusedError, RuntimeError): + logging.error( + 'Cannot set SS_ROLLBACK and SS_SHUTDOWN_PENDING via DBRM, ' + 'graceful auto stop of DMLProc failed. ' + 'Try a regular stop method.' + ) + raise + + @classmethod + def is_service_running(cls, name: str, use_sudo: bool = True) -> bool: + """Check if MCS process is running. + + :param name: mcs process name + :type name: str + :param use_sudo: use sudo or not, defaults to True + :type use_sudo: bool, optional + :return: True if mcs process is running, otherwise False + :rtype: bool + """ + return cls.process_dispatcher.is_service_running( + cls._get_prog_name(name), use_sudo + ) + + @classmethod def start(cls, name: str, is_primary: bool, use_sudo: bool) -> bool: """Start mcs process. @@ -299,20 +334,9 @@ class MCSProcessManager: # TODO: do we need here force stop DMLProc as a method argument? if is_primary and name == 'DMLProc': - logging.info( - 'Trying to gracefully stop DMLProc using DBRM commands.' - ) try: - with DBRM() as dbrm: - dbrm.set_system_state( - ['SS_ROLLBACK', 'SS_SHUTDOWN_PENDING'] - ) + cls.gracefully_stop_dmlproc() except (ConnectionRefusedError, RuntimeError): - logging.error( - 'Cannot set SS_ROLLBACK and SS_SHUTDOWN_PENDING ' - 'using DBRM while trying to gracefully auto stop DMLProc.' - 'Continue with a regular stop method.' - ) # stop DMLProc using regular signals or systemd return cls.process_dispatcher.stop( cls._get_prog_name(name), is_primary, use_sudo diff --git a/cmapi/cmapi_server/managers/transaction.py b/cmapi/cmapi_server/managers/transaction.py new file mode 100644 index 000000000..c7b1b7b1f --- /dev/null +++ b/cmapi/cmapi_server/managers/transaction.py @@ -0,0 +1,136 @@ +"""Module related to CMAPI transaction management logic.""" +import logging +from contextlib import ContextDecorator +from signal import ( + SIGINT, SIGTERM, SIGHUP, SIG_DFL, signal, default_int_handler +) +from typing import Optional, Type + +from cmapi_server.constants import DEFAULT_MCS_CONF_PATH +from cmapi_server.exceptions import CMAPIBasicError +from cmapi_server.helpers import ( + get_id, commit_transaction, rollback_transaction, start_transaction +) + + +class TransactionManager(ContextDecorator): + """Context manager and decorator to put any code inside CMAPI transaction. + + :param timeout: time in sec after transaction will be autocommitted, + defaults to 300.0 + + :param timeout: _description_, defaults to 300 + :type timeout: float, optional + :param txn_id: custom transaction id, defaults to None + :type txn_id: Optional[int], optional + :param handle_signals: handle specific signals or not, defaults to False + :type handle_signals: bool, optional + """ + + def __init__( + self, timeout: float = 300, txn_id: Optional[int] = None, + handle_signals: bool = False + ): + self.timeout = timeout + self.txn_id = txn_id or get_id() + self.handle_signals = handle_signals + self.active_transaction = False + + def _handle_exception( + self, exc: Optional[Type[Exception]] = None, + signum: Optional[int] = None + ) -> None: + """Handle raised exceptions. + + We need to rollback transaction in some cases and return back default + signal handlers. + + :param exc: exception passed, defaults to None + :type exc: Optional[Type[Exception]], optional + :param signum: signal if it cause exception, defaults to None + :type signum: Optional[int], optional + :raises exc: raises passed exception + """ + # message = 'Got exception in transaction manager' + if (exc or signum) and self.active_transaction: + self.rollback_transaction() + self.set_default_signals() + raise exc + + def _handle_signal(self, signum, frame) -> None: + """Handler for signals. + + :param signum: signal number + :type signum: int + """ + logging.error(f'Caught signal "{signum}" in transaction manager.') + self._handle_exception(signum=signum) + + def set_custom_signals(self) -> None: + """Set handlers for several signals.""" + # register handler for signals for proper handling them + for sig in SIGINT, SIGTERM, SIGHUP: + signal(sig, self._handle_signal) + + def set_default_signals(self) -> None: + """Return defalt handlers for specific signals.""" + if self.handle_signals: + signal.signal(signal.SIGINT, default_int_handler) + signal.signal(signal.SIGTERM, SIG_DFL) + signal.signal(signal.SIGHUP, SIG_DFL) + + def rollback_transaction(self) -> None: + """Rollback transaction.""" + try: + rollback_transaction(self.txn_id) + self.active_transaction = False + logging.debug(f'Success rollback of transaction "{self.txn_id}".') + except Exception: + logging.error( + f'Error while rollback transaction "{self.txn_id}"', + exc_info=True + ) + + def commit_transaction(self): + """Commit transaction.""" + try: + commit_transaction( + self.txn_id, cs_config_filename=DEFAULT_MCS_CONF_PATH + ) + except Exception: + logging.error(f'Error while committing transaction {self.txn_id}') + self.rollback_transaction() + self.set_default_signals() + raise + + def __enter__(self): + if self.handle_signals: + self.set_custom_signals() + try: + suceeded, _transaction_id, successes = start_transaction( + cs_config_filename=DEFAULT_MCS_CONF_PATH, + txn_id=self.txn_id, timeout=self.timeout + ) + except Exception as exc: + logging.error('Error while starting the transaction.') + self._handle_exception(exc=exc) + if not suceeded: + self._handle_exception( + exc=CMAPIBasicError('Starting transaction isn\'t succesful.') + ) + if suceeded and len(successes) == 0: + self._handle_exception( + exc=CMAPIBasicError('There are no nodes in the cluster.') + ) + self.active_transaction = True + return self + + def __exit__(self, *exc): + if exc[0] and self.active_transaction: + self.rollback_transaction() + self.set_default_signals() + return False + if self.active_transaction: + self.commit_transaction() + self.set_default_signals() + return True diff --git a/cmapi/cmapi_server/process_dispatchers/container.py b/cmapi/cmapi_server/process_dispatchers/container.py index 0aeccae7a..7db927b32 100644 --- a/cmapi/cmapi_server/process_dispatchers/container.py +++ b/cmapi/cmapi_server/process_dispatchers/container.py @@ -107,6 +107,9 @@ class ContainerDispatcher(BaseDispatcher): :type use_sudo: bool, optional :return: True if service is running, otherwise False :rtype: bool + + ..Note: + Not working with multiple services at a time. """ try: cls._get_proc_object(service) diff --git a/cmapi/cmapi_server/process_dispatchers/systemd.py b/cmapi/cmapi_server/process_dispatchers/systemd.py index 7d3e7e305..8b7b2714d 100644 --- a/cmapi/cmapi_server/process_dispatchers/systemd.py +++ b/cmapi/cmapi_server/process_dispatchers/systemd.py @@ -55,7 +55,7 @@ class SystemdDispatcher(BaseDispatcher): """Check if systemd service is running. :param service: service name - :type service: str, optional + :type service: str :param use_sudo: use sudo or not, defaults to True :type use_sudo: bool, optional :return: True if service is running, otherwise False diff --git a/cmapi/mcs_cluster_tool/cluster_app.py b/cmapi/mcs_cluster_tool/cluster_app.py index 27963adf8..07934f31e 100644 --- a/cmapi/mcs_cluster_tool/cluster_app.py +++ b/cmapi/mcs_cluster_tool/cluster_app.py @@ -3,14 +3,26 @@ Formally this module contains all subcommands for "mcs cluster" cli command. """ import logging +import time +from datetime import datetime, timedelta from typing import List, Optional import pyotp +import requests import typer +from typing_extensions import Annotated -from cmapi_server.constants import SECRET_KEY +from cmapi_server.constants import ( + CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH, SECRET_KEY +) +from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.handlers.cluster import ClusterHandler +from cmapi_server.helpers import ( + get_config_parser, get_current_key, get_version, build_url +) +from cmapi_server.managers.transaction import TransactionManager from mcs_cluster_tool.decorators import handle_output +from mcs_node_control.models.node_config import NodeConfig logger = logging.getLogger('mcs_cli') @@ -32,9 +44,121 @@ def status(): @app.command() @handle_output -def stop(): +@TransactionManager( + timeout=timedelta(days=1).total_seconds(), handle_signals=True +) +def stop( + interactive: Annotated[ + bool, + typer.Option( + '--interactive/--no-interactive', '-i/-no-i', + help=( + 'Use this option on active cluster as interactive stop ' + 'waits for current writes to complete in DMLProc before ' + 'shutting down. Ensuring consistency, preventing data loss ' + 'of active writes.' + ), + ) + ] = False, + timeout: Annotated[ + int, + typer.Option( + '-t', '--timeout', + help=( + 'Time in seconds to wait for DMLproc to gracefully stop.' + 'Warning: Low wait timeout values could result in data loss ' + 'if the cluster is very active.' + 'In interactive mode means delay time between promts.' + ) + ) + ] = 15, + force: Annotated[ + bool, + typer.Option( + '--force/--no-force', '-f/-no-f', + help=( + 'Force stops Columnstore.' + 'Warning: This could cause data corruption and/or data loss.' + ), + #TODO: hide from help till not investigated in decreased timeout + # affect + hidden=True + ) + ] = False +): """Stop the Columnstore cluster.""" - return ClusterHandler.shutdown(logger=logger) + + start_time = str(datetime.now()) + if interactive: + # TODO: for standalone cli tool need to change primary detection + # method. Partially move logic below to ClusterController + nc = NodeConfig() + root = nc.get_current_config_root( + config_filename=DEFAULT_MCS_CONF_PATH + ) + primary_node = root.find("./PrimaryNode").text + cfg_parser = get_config_parser(CMAPI_CONF_PATH) + api_key = get_current_key(cfg_parser) + version = get_version() + + headers = {'x-api-key': api_key} + body = {'force': False, 'timeout': timeout} + url = f'https://{primary_node}:8640/cmapi/{version}/node/stop_dmlproc' + try: + resp = requests.put( + url, verify=False, headers=headers, json=body, + timeout=timeout+1 + ) + resp.raise_for_status() + except Exception as err: + raise CMAPIBasicError( + f'Error while stopping DMLProc on primary node.' + ) from err + + force = True + while True: + time.sleep(timeout) + url = build_url( + base_url=primary_node, port=8640, + query_params={'process_name': 'DMLProc'}, + path=f'cmapi/{version}/node/is_process_running', + ) + try: + resp = requests.get( + url, verify=False, headers=headers, timeout=timeout + ) + resp.raise_for_status() + except Exception as err: + raise CMAPIBasicError( + f'Error while getting mcs DMLProc status.' + ) from err + + # check DMLPRoc state + # if ended, show message and break + dmlproc_running = resp.json()['running'] + if not dmlproc_running: + logging.info( + 'DMLProc stopped gracefully. ' + 'Continue stopping other processes.' + ) + break + else: + force = typer.confirm( + 'DMLProc is still running. ' + 'Do you want to force stop? ' + 'WARNING: Could cause data loss and/or broken cluster.', + prompt_suffix=' ' + ) + if force: + break + else: + continue + if force: + # TODO: investigate more on how changing the hardcoded timeout + # could affect put_config (helpers.py broadcast_config) operation + timeout = 0 + _ = ClusterHandler.shutdown(logger=logger, in_transaction=True) + return {'timestamp': start_time} @app.command() diff --git a/cmapi/mcs_node_control/models/node_config.py b/cmapi/mcs_node_control/models/node_config.py index 78a16d78b..91930b0bf 100644 --- a/cmapi/mcs_node_control/models/node_config.py +++ b/cmapi/mcs_node_control/models/node_config.py @@ -115,7 +115,6 @@ class NodeConfig: maintenance = etree.SubElement(root, 'Maintenance') maintenance.text = str(False).lower() - def upgrade_config(self, tree=None, root=None, upgrade=True): """ Add the parts that might be missing after an upgrade from an earlier @@ -290,7 +289,6 @@ class NodeConfig: return pm_num raise Exception("Did not find my IP addresses or names in the SystemModuleConfig section") - def rollback_config(self, config_filename: str = DEFAULT_MCS_CONF_PATH): """Rollback the configuration. @@ -307,7 +305,6 @@ class NodeConfig: if config_file_copy.exists(): replace(backup_path, config_file) # atomic replacement - def get_current_config(self, config_filename: str = DEFAULT_MCS_CONF_PATH): """Retrievs current configuration. @@ -325,7 +322,6 @@ class NodeConfig: tree.getroot(), pretty_print=True, encoding='unicode' ) - def get_current_sm_config( self, config_filename: str = DEFAULT_SM_CONF_PATH ) -> str: @@ -343,7 +339,6 @@ class NodeConfig: module_logger.error(f"{func_name} SM config {config_filename} not found.") return '' - def s3_enabled(self, config_filename: str = DEFAULT_SM_CONF_PATH) -> bool: """Checks if SM is enabled