1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00

MCOL-5594: Interactive "mcs cluster stop" command for CMAPI. (#3024)

* MCOL-5594: Interactive "mcs cluster stop" command for CMAPI.

[add] NodeProcessController class to handle Node operations
[add] two endpoints: stop_dmlproc (PUT) and is_process_running (GET)
[add] NodeProcessController.put_stop_dmlproc method to separately stop DMLProc on primary Node
[add] NodeProcessController.get_process_running method to check if specified process running or not
[add] build_url function to helpers.py. It needed to build urls with query_params
[add] MCSProcessManager.gracefully_stop_dmlproc method
[add] MCSProcessManager.is_service_running method as a top level wrapper to the same method in dispatcher
[fix] MCSProcessManager.stop by using new gracefully_stop_dmlproc
[add] interactive option and mode to mcs cluster stop command
[fix] requirements.txt with typer version to 0.9.0 where supports various of features including "Annotated"
[fix] requirements.txt click version (8.1.3 -> 8.1.7) and typing-extensions (4.3.0 -> 4.8.0). This is dependencies for typer package.
[fix] multiple minor formatting, docstrings and comments

* MCOL-5594: Add new CMAPI transaction manager.

- [add] TransactionManager ContextDecorator to manage transactions in less code and in one place
- [add] TransactionManager to cli cluster stop command and to API cluster shutdown command
- [fix] id -> txn_id in ClusterHandler class
- [fix] ClusterHandler.shutdown class to use inside existing transaction
- [add] docstrings in multiple places

* MCOL-5594: Review fixes.
This commit is contained in:
Alan Mologorsky 2024-02-23 21:40:50 +03:00 committed by GitHub
parent ed9ec93358
commit dec8350f0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 518 additions and 96 deletions

View File

@ -6,7 +6,7 @@ from cmapi_server.controllers.endpoints import (
StatusController, ConfigController, BeginController, CommitController, StatusController, ConfigController, BeginController, CommitController,
RollbackController, StartController, ShutdownController, RollbackController, StartController, ShutdownController,
ExtentMapController, ClusterController, ApiKeyController, ExtentMapController, ClusterController, ApiKeyController,
LoggingConfigController, AppController LoggingConfigController, AppController, NodeProcessController
) )
from cmapi_server.controllers.s3dataload import S3DataLoadController from cmapi_server.controllers.s3dataload import S3DataLoadController
@ -241,6 +241,26 @@ dispatcher.connect(
) )
# /_version/node/stop_dmlproc/ (PUT)
dispatcher.connect(
name = 'stop_dmlproc',
route = f'/cmapi/{_version}/node/stop_dmlproc',
action = 'put_stop_dmlproc',
controller = NodeProcessController(),
conditions = {'method': ['PUT']}
)
# /_version/node/is_process_running/ (PUT)
dispatcher.connect(
name = 'is_process_running',
route = f'/cmapi/{_version}/node/is_process_running',
action = 'get_process_running',
controller = NodeProcessController(),
conditions = {'method': ['GET']}
)
def jsonify_error(status, message, traceback, version): \ def jsonify_error(status, message, traceback, version): \
# pylint: disable=unused-argument # pylint: disable=unused-argument
"""JSONify all CherryPy error responses (created by raising the """JSONify all CherryPy error responses (created by raising the

View File

@ -63,6 +63,23 @@ def raise_422_error(
raise APIError(422, err_msg) raise APIError(422, err_msg)
# TODO: Move somwhere else, eg. to helpers
def get_use_sudo(app_config: dict) -> bool:
"""Get value about using superuser or not from app config.
:param app_config: CherryPy application config
:type app_config: dict
:return: use_sudo config value
:rtype: bool
"""
privileges_section = app_config.get('Privileges', None)
if privileges_section is not None:
use_sudo = privileges_section.get('use_sudo', False)
else:
use_sudo = False
return use_sudo
@cherrypy.tools.register('before_handler', priority=80) @cherrypy.tools.register('before_handler', priority=80)
def validate_api_key(): def validate_api_key():
"""Validate API key. """Validate API key.
@ -513,6 +530,7 @@ IP address.")
module_logger.debug(f'{func_name} returns {str(begin_response)}') module_logger.debug(f'{func_name} returns {str(begin_response)}')
return begin_response return begin_response
class CommitController: class CommitController:
@cherrypy.tools.timeit() @cherrypy.tools.timeit()
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@ -601,15 +619,6 @@ class RollbackController:
return rollback_response return rollback_response
def get_use_sudo(app_config):
privileges_section = app_config.get('Privileges', None)
if privileges_section is not None:
use_sudo = privileges_section.get('use_sudo', False)
else:
use_sudo = False
return use_sudo
class StartController: class StartController:
@cherrypy.tools.timeit() @cherrypy.tools.timeit()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@ -1137,3 +1146,59 @@ class AppController():
return {'started': True} return {'started': True}
else: else:
raise APIError(503, 'CMAPI not ready to handle requests.') raise APIError(503, 'CMAPI not ready to handle requests.')
class NodeProcessController():
@cherrypy.tools.timeit()
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.validate_api_key() # pylint: disable=no-member
def put_stop_dmlproc(self):
"""Handler for /node/stop_dmlproc (PUT) endpoint."""
# TODO: make it works only from cli tool like set_api_key made
func_name = 'put_stop_dmlproc'
log_begin(module_logger, func_name)
request = cherrypy.request
request_body = request.json
timeout = request_body.get('timeout', 10)
force = request_body.get('force', False)
if force:
module_logger.debug(
f'Calling DMLproc to force stop after timeout={timeout}.'
)
MCSProcessManager.stop(
name='DMLProc', is_primary=True, use_sudo=True, timeout=timeout
)
else:
module_logger.debug('Callling stop DMLproc gracefully.')
try:
MCSProcessManager.gracefully_stop_dmlproc()
except (ConnectionRefusedError, RuntimeError):
raise_422_error(
logger=module_logger, func_name=func_name,
err_msg='Couldn\'t stop DMlproc gracefully'
)
response = {'timestamp': str(datetime.now())}
module_logger.debug(f'{func_name} returns {str(response)}')
return response
@cherrypy.tools.timeit()
@cherrypy.tools.json_out()
@cherrypy.tools.validate_api_key() # pylint: disable=no-member
def get_process_running(self, process_name):
"""Handler for /node/is_process_running (GET) endpoint."""
func_name = 'get_process_running'
log_begin(module_logger, func_name)
process_running = MCSProcessManager.is_service_running(process_name)
response = {
'timestamp': str(datetime.now()),
'process_name': process_name,
'running': process_running
}
module_logger.debug(f'{func_name} returns {str(response)}')
return response

View File

@ -1,6 +1,7 @@
"""Module contains Cluster business logic functions.""" """Module contains Cluster business logic functions."""
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Optional
import requests import requests
@ -13,6 +14,7 @@ from cmapi_server.helpers import (
get_config_parser, get_current_key, get_id, get_version, start_transaction, get_config_parser, get_current_key, get_id, get_version, start_transaction,
rollback_transaction, update_revision_and_manager, rollback_transaction, update_revision_and_manager,
) )
from cmapi_server.managers.transaction import TransactionManager
from cmapi_server.node_manipulation import ( from cmapi_server.node_manipulation import (
add_node, add_dbroot, remove_node, switch_node_maintenance, add_node, add_dbroot, remove_node, switch_node_maintenance,
) )
@ -96,7 +98,7 @@ class ClusterHandler():
try: try:
suceeded, transaction_id, successes = start_transaction( suceeded, transaction_id, successes = start_transaction(
cs_config_filename=config, id=transaction_id cs_config_filename=config, txn_id=transaction_id
) )
except Exception as err: except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config) rollback_transaction(transaction_id, cs_config_filename=config)
@ -141,7 +143,9 @@ class ClusterHandler():
@staticmethod @staticmethod
def shutdown( def shutdown(
config: str = DEFAULT_MCS_CONF_PATH, config: str = DEFAULT_MCS_CONF_PATH,
logger: logging.Logger = logging.getLogger('cmapi_server') logger: logging.Logger = logging.getLogger('cmapi_server'),
in_transaction: bool = False,
timeout: int = 15
) -> dict: ) -> dict:
"""Method to stop the MCS Cluster. """Method to stop the MCS Cluster.
@ -150,6 +154,11 @@ class ClusterHandler():
:type config: str, optional :type config: str, optional
:param logger: logger, defaults to logging.getLogger('cmapi_server') :param logger: logger, defaults to logging.getLogger('cmapi_server')
:type logger: logging.Logger, optional :type logger: logging.Logger, optional
:param in_transaction: is function called in existing transaction or no
:type in_transaction: bool
:param timeout: timeout in seconds to gracefully stop DMLProc
TODO: for next releases
:type timeout: int
:raises CMAPIBasicError: if no nodes in the cluster :raises CMAPIBasicError: if no nodes in the cluster
:return: start timestamp :return: start timestamp
:rtype: dict :rtype: dict
@ -158,49 +167,28 @@ class ClusterHandler():
'Cluster shutdown command called. Shutting down the cluster.' 'Cluster shutdown command called. Shutting down the cluster.'
) )
def process_shutdown():
"""Raw node shutdown processing."""
switch_node_maintenance(True)
update_revision_and_manager()
# TODO: move this from multiple places to one, eg to helpers
try:
broadcast_successful = broadcast_new_config(config)
except Exception as err:
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
raise CMAPIBasicError('Config distribution isn\'t successful.')
start_time = str(datetime.now()) start_time = str(datetime.now())
transaction_id = get_id() if not in_transaction:
with TransactionManager():
try: process_shutdown()
suceeded, transaction_id, successes = start_transaction( else:
cs_config_filename=config, id=transaction_id process_shutdown()
)
except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError(
'Error while starting the transaction.'
) from err
if not suceeded:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError('Starting transaction isn\'t successful.')
if suceeded and len(successes) == 0:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError('There are no nodes in the cluster.')
switch_node_maintenance(True)
update_revision_and_manager()
# TODO: move this from multiple places to one, eg to helpers
try:
broadcast_successful = broadcast_new_config(config)
except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError('Config distribution isn\'t successful.')
try:
commit_transaction(transaction_id, cs_config_filename=config)
except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError(
'Error while committing transaction.'
) from err
logger.debug('Successfully finished shutting down the cluster.') logger.debug('Successfully finished shutting down the cluster.')
return {'timestamp': start_time} return {'timestamp': start_time}
@ -236,7 +224,7 @@ class ClusterHandler():
try: try:
suceeded, transaction_id, successes = start_transaction( suceeded, transaction_id, successes = start_transaction(
cs_config_filename=config, extra_nodes=[node], cs_config_filename=config, extra_nodes=[node],
id=transaction_id txn_id=transaction_id
) )
except Exception as err: except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config) rollback_transaction(transaction_id, cs_config_filename=config)
@ -321,7 +309,7 @@ class ClusterHandler():
try: try:
suceeded, transaction_id, txn_nodes = start_transaction( suceeded, transaction_id, txn_nodes = start_transaction(
cs_config_filename=config, remove_nodes=[node], cs_config_filename=config, remove_nodes=[node],
id=transaction_id txn_id=transaction_id
) )
except Exception as err: except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config) rollback_transaction(transaction_id, cs_config_filename=config)
@ -425,7 +413,7 @@ class ClusterHandler():
try: try:
suceeded, transaction_id, successes = start_transaction( suceeded, transaction_id, successes = start_transaction(
cs_config_filename=config, id=transaction_id cs_config_filename=config, txn_id=transaction_id
) )
except Exception as err: except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config) rollback_transaction(transaction_id, cs_config_filename=config)

View File

@ -11,10 +11,12 @@ import logging
import os import os
import socket import socket
import time import time
from collections import namedtuple
from functools import partial from functools import partial
from random import random from random import random
from shutil import copyfile from shutil import copyfile
from typing import Tuple, Optional from typing import Tuple, Optional
from urllib.parse import urlencode, urlunparse
import lxml.objectify import lxml.objectify
import requests import requests
@ -32,17 +34,25 @@ from cmapi_server.managers.process import MCSProcessManager
from mcs_node_control.models.node_config import NodeConfig from mcs_node_control.models.node_config import NodeConfig
def get_id(): def get_id() -> int:
"""Generate pseudo random id for transaction.
:return: id for internal transaction
:rtype: int
..TODO: need to change transaction id format and generation method?
"""
return int(random() * 1000000) return int(random() * 1000000)
def start_transaction( def start_transaction(
config_filename=CMAPI_CONF_PATH, config_filename: str = CMAPI_CONF_PATH,
cs_config_filename=DEFAULT_MCS_CONF_PATH, cs_config_filename: str = DEFAULT_MCS_CONF_PATH,
extra_nodes=None, extra_nodes: Optional[list] = None,
remove_nodes=None, remove_nodes: Optional[list] = None,
optional_nodes=None, optional_nodes: Optional[list] = None,
id=get_id() txn_id: Optional[int] = None,
timeout: float = 300.0
): ):
"""Start internal CMAPI transaction. """Start internal CMAPI transaction.
@ -53,19 +63,26 @@ def start_transaction(
:param config_filename: cmapi config filepath, :param config_filename: cmapi config filepath,
defaults to CMAPI_CONF_PATH defaults to CMAPI_CONF_PATH
:type config_filename: str :type config_filename: str, optional
:param cs_config_filename: columnstore xml config filepath, :param cs_config_filename: columnstore xml config filepath,
defaults to DEFAULT_MCS_CONF_PATH defaults to DEFAULT_MCS_CONF_PATH
:type cs_config_filename: str, optional :type cs_config_filename: str, optional
:param extra_nodes: extra nodes, defaults to None :param extra_nodes: extra nodes, defaults to None
:type extra_nodes: list, optional :type extra_nodes: Optional[list], optional
:param remove_nodes: remove nodes, defaults to None :param remove_nodes: remove nodes, defaults to None
:type remove_nodes: list, optional :type remove_nodes: Optional[list], optional
:param optional_nodes: optional nodes, defaults to None :param optional_nodes: optional nodes, defaults to None
:type optional_nodes: list, optional :type optional_nodes: Optional[list], optional
:return: (success, txnid, nodes) :param txn_id: id for transaction to start, defaults to None
:rtype: tuple :type txn_id: Optional[int], optional
:param timeout: time in seconds for cmapi transaction lock before it ends
automatically, defaults to 300
:type timeout: float, optional
:return: (success, txn_id, nodes)
:rtype: tuple[bool, int, list[str]]
""" """
if txn_id is None:
txn_id = get_id()
# TODO: Somehow change that logic for eg using several input types # TODO: Somehow change that logic for eg using several input types
# (str\list\set) and detect which one we got. # (str\list\set) and detect which one we got.
extra_nodes = extra_nodes or [] extra_nodes = extra_nodes or []
@ -78,8 +95,8 @@ def start_transaction(
version = get_version() version = get_version()
headers = {'x-api-key': api_key} headers = {'x-api-key': api_key}
body = {'id' : id} body = {'id' : txn_id}
final_time = datetime.datetime.now() + datetime.timedelta(seconds=300) final_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
success = False success = False
while datetime.datetime.now() < final_time and not success: while datetime.datetime.now() < final_time and not success:
@ -180,7 +197,7 @@ def start_transaction(
time.sleep(1) time.sleep(1)
if not node_success and node not in optional_nodes: if not node_success and node not in optional_nodes:
rollback_txn_attempt(api_key, version, id, successes) rollback_txn_attempt(api_key, version, txn_id, successes)
# wait up to 5 secs and try the whole thing again # wait up to 5 secs and try the whole thing again
time.sleep(random() * 5) time.sleep(random() * 5)
break break
@ -192,7 +209,7 @@ def start_transaction(
# are up (> 50%). # are up (> 50%).
success = (len(successes) == len(real_active_nodes)) success = (len(successes) == len(real_active_nodes))
return (success, id, successes) return (success, txn_id, successes)
def rollback_txn_attempt(key, version, txnid, nodes): def rollback_txn_attempt(key, version, txnid, nodes):
headers = {'x-api-key': key} headers = {'x-api-key': key}
@ -273,6 +290,7 @@ def broadcast_new_config(
sm_config_filename: str = DEFAULT_SM_CONF_PATH, sm_config_filename: str = DEFAULT_SM_CONF_PATH,
test_mode: bool = False, test_mode: bool = False,
nodes: Optional[list] = None, nodes: Optional[list] = None,
timeout: int = 10
) -> bool: ) -> bool:
"""Send new config to nodes. Now in async way. """Send new config to nodes. Now in async way.
@ -289,8 +307,11 @@ def broadcast_new_config(
:type test_mode: bool, optional :type test_mode: bool, optional
:param nodes: nodes list for config put, defaults to None :param nodes: nodes list for config put, defaults to None
:type nodes: Optional[list], optional :type nodes: Optional[list], optional
:param timeout: timeout passing to gracefully stop DMLProc TODO: for next
releases. Could affect all logic of broadcacting new config
:type timeout: int
:return: success state :return: success state
:rtype: _type_ :rtype: bool
""" """
cfg_parser = get_config_parser(cmapi_config_filename) cfg_parser = get_config_parser(cmapi_config_filename)
@ -326,6 +347,11 @@ def broadcast_new_config(
async def update_config(node, success_nodes, failed_nodes, headers, body): async def update_config(node, success_nodes, failed_nodes, headers, body):
url = f'https://{node}:8640/cmapi/{version}/node/config' url = f'https://{node}:8640/cmapi/{version}/node/config'
# TODO: investigate about hardcoded 120 seconds timeout
# Check e1242eed47b61276ebc86136f124f6d974655515 in cmapi old
# repo to get more info. Patric made it because:
# "Made the timeout for a CS process restart 120s, since
# the container dispatcher waits up to 60s for SM to stop"
request_put = partial( request_put = partial(
requests.put, url, verify=False, headers=headers, json=body, requests.put, url, verify=False, headers=headers, json=body,
timeout=120 timeout=120
@ -845,3 +871,44 @@ def get_dispatcher_name_and_path(
config_parser.get('Dispatcher', 'path', fallback='') config_parser.get('Dispatcher', 'path', fallback='')
) )
return dispatcher_name, dispatcher_path return dispatcher_name, dispatcher_path
def build_url(
base_url: str, query_params: dict, scheme: str = 'https',
path: str = '', params: str = '', fragment: str = '',
port: Optional[int] = None
) -> str:
"""Build url with query params.
:param base_url: base url address
:type base_url: str
:param query_params: query params
:type query_params: dict
:param scheme: url scheme, defaults to 'https'
:type scheme: str, optional
:param path: url path, defaults to ''
:type path: str, optional
:param params: params, defaults to ''
:type params: str, optional
:param fragment: fragment, defaults to ''
:type fragment: str, optional
:param port: port for base url, defaults to None
:type port: Optional[int], optional
:return: url with query params
:rtype: str
"""
# namedtuple to match the internal signature of urlunparse
Components = namedtuple(
typename='Components',
field_names=['scheme', 'netloc', 'path', 'params', 'query', 'fragment']
)
return urlunparse(
Components(
scheme=scheme,
netloc=f'{base_url}:{port}' if port else base_url,
path=path,
params=params,
query=urlencode(query_params),
fragment=fragment
)
)

View File

@ -262,6 +262,41 @@ class MCSProcessManager:
"""No operation. TODO: looks like useless.""" """No operation. TODO: looks like useless."""
cls.process_dispatcher.noop() cls.process_dispatcher.noop()
@classmethod
def gracefully_stop_dmlproc(cls) -> None:
"""Gracefully stop DMLProc using DBRM commands."""
logging.info(
'Trying to gracefully stop DMLProc using DBRM commands.'
)
try:
with DBRM() as dbrm:
dbrm.set_system_state(
['SS_ROLLBACK', 'SS_SHUTDOWN_PENDING']
)
except (ConnectionRefusedError, RuntimeError):
logging.error(
'Cannot set SS_ROLLBACK and SS_SHUTDOWN_PENDING via DBRM, '
'graceful auto stop of DMLProc failed. '
'Try a regular stop method.'
)
raise
@classmethod
def is_service_running(cls, name: str, use_sudo: bool = True) -> bool:
"""Check if MCS process is running.
:param name: mcs process name
:type name: str
:param use_sudo: use sudo or not, defaults to True
:type use_sudo: bool, optional
:return: True if mcs process is running, otherwise False
:rtype: bool
"""
return cls.process_dispatcher.is_service_running(
cls._get_prog_name(name), use_sudo
)
@classmethod @classmethod
def start(cls, name: str, is_primary: bool, use_sudo: bool) -> bool: def start(cls, name: str, is_primary: bool, use_sudo: bool) -> bool:
"""Start mcs process. """Start mcs process.
@ -299,20 +334,9 @@ class MCSProcessManager:
# TODO: do we need here force stop DMLProc as a method argument? # TODO: do we need here force stop DMLProc as a method argument?
if is_primary and name == 'DMLProc': if is_primary and name == 'DMLProc':
logging.info(
'Trying to gracefully stop DMLProc using DBRM commands.'
)
try: try:
with DBRM() as dbrm: cls.gracefully_stop_dmlproc()
dbrm.set_system_state(
['SS_ROLLBACK', 'SS_SHUTDOWN_PENDING']
)
except (ConnectionRefusedError, RuntimeError): except (ConnectionRefusedError, RuntimeError):
logging.error(
'Cannot set SS_ROLLBACK and SS_SHUTDOWN_PENDING '
'using DBRM while trying to gracefully auto stop DMLProc.'
'Continue with a regular stop method.'
)
# stop DMLProc using regular signals or systemd # stop DMLProc using regular signals or systemd
return cls.process_dispatcher.stop( return cls.process_dispatcher.stop(
cls._get_prog_name(name), is_primary, use_sudo cls._get_prog_name(name), is_primary, use_sudo

View File

@ -0,0 +1,136 @@
"""Module related to CMAPI transaction management logic."""
import logging
from contextlib import ContextDecorator
from signal import (
SIGINT, SIGTERM, SIGHUP, SIG_DFL, signal, default_int_handler
)
from typing import Optional, Type
from cmapi_server.constants import DEFAULT_MCS_CONF_PATH
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.helpers import (
get_id, commit_transaction, rollback_transaction, start_transaction
)
class TransactionManager(ContextDecorator):
"""Context manager and decorator to put any code inside CMAPI transaction.
:param timeout: time in sec after transaction will be autocommitted,
defaults to 300.0
:param timeout: _description_, defaults to 300
:type timeout: float, optional
:param txn_id: custom transaction id, defaults to None
:type txn_id: Optional[int], optional
:param handle_signals: handle specific signals or not, defaults to False
:type handle_signals: bool, optional
"""
def __init__(
self, timeout: float = 300, txn_id: Optional[int] = None,
handle_signals: bool = False
):
self.timeout = timeout
self.txn_id = txn_id or get_id()
self.handle_signals = handle_signals
self.active_transaction = False
def _handle_exception(
self, exc: Optional[Type[Exception]] = None,
signum: Optional[int] = None
) -> None:
"""Handle raised exceptions.
We need to rollback transaction in some cases and return back default
signal handlers.
:param exc: exception passed, defaults to None
:type exc: Optional[Type[Exception]], optional
:param signum: signal if it cause exception, defaults to None
:type signum: Optional[int], optional
:raises exc: raises passed exception
"""
# message = 'Got exception in transaction manager'
if (exc or signum) and self.active_transaction:
self.rollback_transaction()
self.set_default_signals()
raise exc
def _handle_signal(self, signum, frame) -> None:
"""Handler for signals.
:param signum: signal number
:type signum: int
"""
logging.error(f'Caught signal "{signum}" in transaction manager.')
self._handle_exception(signum=signum)
def set_custom_signals(self) -> None:
"""Set handlers for several signals."""
# register handler for signals for proper handling them
for sig in SIGINT, SIGTERM, SIGHUP:
signal(sig, self._handle_signal)
def set_default_signals(self) -> None:
"""Return defalt handlers for specific signals."""
if self.handle_signals:
signal.signal(signal.SIGINT, default_int_handler)
signal.signal(signal.SIGTERM, SIG_DFL)
signal.signal(signal.SIGHUP, SIG_DFL)
def rollback_transaction(self) -> None:
"""Rollback transaction."""
try:
rollback_transaction(self.txn_id)
self.active_transaction = False
logging.debug(f'Success rollback of transaction "{self.txn_id}".')
except Exception:
logging.error(
f'Error while rollback transaction "{self.txn_id}"',
exc_info=True
)
def commit_transaction(self):
"""Commit transaction."""
try:
commit_transaction(
self.txn_id, cs_config_filename=DEFAULT_MCS_CONF_PATH
)
except Exception:
logging.error(f'Error while committing transaction {self.txn_id}')
self.rollback_transaction()
self.set_default_signals()
raise
def __enter__(self):
if self.handle_signals:
self.set_custom_signals()
try:
suceeded, _transaction_id, successes = start_transaction(
cs_config_filename=DEFAULT_MCS_CONF_PATH,
txn_id=self.txn_id, timeout=self.timeout
)
except Exception as exc:
logging.error('Error while starting the transaction.')
self._handle_exception(exc=exc)
if not suceeded:
self._handle_exception(
exc=CMAPIBasicError('Starting transaction isn\'t succesful.')
)
if suceeded and len(successes) == 0:
self._handle_exception(
exc=CMAPIBasicError('There are no nodes in the cluster.')
)
self.active_transaction = True
return self
def __exit__(self, *exc):
if exc[0] and self.active_transaction:
self.rollback_transaction()
self.set_default_signals()
return False
if self.active_transaction:
self.commit_transaction()
self.set_default_signals()
return True

View File

@ -107,6 +107,9 @@ class ContainerDispatcher(BaseDispatcher):
:type use_sudo: bool, optional :type use_sudo: bool, optional
:return: True if service is running, otherwise False :return: True if service is running, otherwise False
:rtype: bool :rtype: bool
..Note:
Not working with multiple services at a time.
""" """
try: try:
cls._get_proc_object(service) cls._get_proc_object(service)

View File

@ -55,7 +55,7 @@ class SystemdDispatcher(BaseDispatcher):
"""Check if systemd service is running. """Check if systemd service is running.
:param service: service name :param service: service name
:type service: str, optional :type service: str
:param use_sudo: use sudo or not, defaults to True :param use_sudo: use sudo or not, defaults to True
:type use_sudo: bool, optional :type use_sudo: bool, optional
:return: True if service is running, otherwise False :return: True if service is running, otherwise False

View File

@ -3,14 +3,26 @@
Formally this module contains all subcommands for "mcs cluster" cli command. Formally this module contains all subcommands for "mcs cluster" cli command.
""" """
import logging import logging
import time
from datetime import datetime, timedelta
from typing import List, Optional from typing import List, Optional
import pyotp import pyotp
import requests
import typer import typer
from typing_extensions import Annotated
from cmapi_server.constants import SECRET_KEY from cmapi_server.constants import (
CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH, SECRET_KEY
)
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.handlers.cluster import ClusterHandler from cmapi_server.handlers.cluster import ClusterHandler
from cmapi_server.helpers import (
get_config_parser, get_current_key, get_version, build_url
)
from cmapi_server.managers.transaction import TransactionManager
from mcs_cluster_tool.decorators import handle_output from mcs_cluster_tool.decorators import handle_output
from mcs_node_control.models.node_config import NodeConfig
logger = logging.getLogger('mcs_cli') logger = logging.getLogger('mcs_cli')
@ -32,9 +44,121 @@ def status():
@app.command() @app.command()
@handle_output @handle_output
def stop(): @TransactionManager(
timeout=timedelta(days=1).total_seconds(), handle_signals=True
)
def stop(
interactive: Annotated[
bool,
typer.Option(
'--interactive/--no-interactive', '-i/-no-i',
help=(
'Use this option on active cluster as interactive stop '
'waits for current writes to complete in DMLProc before '
'shutting down. Ensuring consistency, preventing data loss '
'of active writes.'
),
)
] = False,
timeout: Annotated[
int,
typer.Option(
'-t', '--timeout',
help=(
'Time in seconds to wait for DMLproc to gracefully stop.'
'Warning: Low wait timeout values could result in data loss '
'if the cluster is very active.'
'In interactive mode means delay time between promts.'
)
)
] = 15,
force: Annotated[
bool,
typer.Option(
'--force/--no-force', '-f/-no-f',
help=(
'Force stops Columnstore.'
'Warning: This could cause data corruption and/or data loss.'
),
#TODO: hide from help till not investigated in decreased timeout
# affect
hidden=True
)
] = False
):
"""Stop the Columnstore cluster.""" """Stop the Columnstore cluster."""
return ClusterHandler.shutdown(logger=logger)
start_time = str(datetime.now())
if interactive:
# TODO: for standalone cli tool need to change primary detection
# method. Partially move logic below to ClusterController
nc = NodeConfig()
root = nc.get_current_config_root(
config_filename=DEFAULT_MCS_CONF_PATH
)
primary_node = root.find("./PrimaryNode").text
cfg_parser = get_config_parser(CMAPI_CONF_PATH)
api_key = get_current_key(cfg_parser)
version = get_version()
headers = {'x-api-key': api_key}
body = {'force': False, 'timeout': timeout}
url = f'https://{primary_node}:8640/cmapi/{version}/node/stop_dmlproc'
try:
resp = requests.put(
url, verify=False, headers=headers, json=body,
timeout=timeout+1
)
resp.raise_for_status()
except Exception as err:
raise CMAPIBasicError(
f'Error while stopping DMLProc on primary node.'
) from err
force = True
while True:
time.sleep(timeout)
url = build_url(
base_url=primary_node, port=8640,
query_params={'process_name': 'DMLProc'},
path=f'cmapi/{version}/node/is_process_running',
)
try:
resp = requests.get(
url, verify=False, headers=headers, timeout=timeout
)
resp.raise_for_status()
except Exception as err:
raise CMAPIBasicError(
f'Error while getting mcs DMLProc status.'
) from err
# check DMLPRoc state
# if ended, show message and break
dmlproc_running = resp.json()['running']
if not dmlproc_running:
logging.info(
'DMLProc stopped gracefully. '
'Continue stopping other processes.'
)
break
else:
force = typer.confirm(
'DMLProc is still running. '
'Do you want to force stop? '
'WARNING: Could cause data loss and/or broken cluster.',
prompt_suffix=' '
)
if force:
break
else:
continue
if force:
# TODO: investigate more on how changing the hardcoded timeout
# could affect put_config (helpers.py broadcast_config) operation
timeout = 0
_ = ClusterHandler.shutdown(logger=logger, in_transaction=True)
return {'timestamp': start_time}
@app.command() @app.command()

View File

@ -115,7 +115,6 @@ class NodeConfig:
maintenance = etree.SubElement(root, 'Maintenance') maintenance = etree.SubElement(root, 'Maintenance')
maintenance.text = str(False).lower() maintenance.text = str(False).lower()
def upgrade_config(self, tree=None, root=None, upgrade=True): def upgrade_config(self, tree=None, root=None, upgrade=True):
""" """
Add the parts that might be missing after an upgrade from an earlier Add the parts that might be missing after an upgrade from an earlier
@ -290,7 +289,6 @@ class NodeConfig:
return pm_num return pm_num
raise Exception("Did not find my IP addresses or names in the SystemModuleConfig section") raise Exception("Did not find my IP addresses or names in the SystemModuleConfig section")
def rollback_config(self, config_filename: str = DEFAULT_MCS_CONF_PATH): def rollback_config(self, config_filename: str = DEFAULT_MCS_CONF_PATH):
"""Rollback the configuration. """Rollback the configuration.
@ -307,7 +305,6 @@ class NodeConfig:
if config_file_copy.exists(): if config_file_copy.exists():
replace(backup_path, config_file) # atomic replacement replace(backup_path, config_file) # atomic replacement
def get_current_config(self, config_filename: str = DEFAULT_MCS_CONF_PATH): def get_current_config(self, config_filename: str = DEFAULT_MCS_CONF_PATH):
"""Retrievs current configuration. """Retrievs current configuration.
@ -325,7 +322,6 @@ class NodeConfig:
tree.getroot(), pretty_print=True, encoding='unicode' tree.getroot(), pretty_print=True, encoding='unicode'
) )
def get_current_sm_config( def get_current_sm_config(
self, config_filename: str = DEFAULT_SM_CONF_PATH self, config_filename: str = DEFAULT_SM_CONF_PATH
) -> str: ) -> str:
@ -343,7 +339,6 @@ class NodeConfig:
module_logger.error(f"{func_name} SM config {config_filename} not found.") module_logger.error(f"{func_name} SM config {config_filename} not found.")
return '' return ''
def s3_enabled(self, config_filename: str = DEFAULT_SM_CONF_PATH) -> bool: def s3_enabled(self, config_filename: str = DEFAULT_SM_CONF_PATH) -> bool:
"""Checks if SM is enabled """Checks if SM is enabled