1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(cmapi): MCOL-5133: Stage1 to stand alone cli tool. (#3378)

[add] cluster api client class
[fix] cli cluster_app using cluster api client
[add] ClusterAction Enum
[add] toggle_cluster_state function to reduce code duplication
This commit is contained in:
Alan Mologorsky
2025-03-12 14:21:32 +03:00
committed by Leonid Fedorov
parent a00711cc16
commit a60a5288d8
7 changed files with 226 additions and 109 deletions

View File

@ -82,3 +82,10 @@ MCS_INSTALL_BIN = '/usr/bin'
IFLAG = os.path.join(MCS_ETC_PATH, 'container-initialized') IFLAG = os.path.join(MCS_ETC_PATH, 'container-initialized')
LIBJEMALLOC_DEFAULT_PATH = os.path.join(MCS_DATA_PATH, 'libjemalloc.so.2') LIBJEMALLOC_DEFAULT_PATH = os.path.join(MCS_DATA_PATH, 'libjemalloc.so.2')
MCS_LOG_PATH = '/var/log/mariadb/columnstore' MCS_LOG_PATH = '/var/log/mariadb/columnstore'
# client constants
CMAPI_PORT = 8640 #TODO: use it in all places
CURRENT_NODE_CMAPI_URL = f'https://localhost:{CMAPI_PORT}'
REQUEST_TIMEOUT: float = 30.0
TRANSACTION_TIMEOUT: float = 300.0 # 5 minutes

View File

@ -0,0 +1,126 @@
import requests
from typing import Any, Dict, Optional, Union
from cmapi_server.controllers.dispatcher import _version
from cmapi_server.constants import CURRENT_NODE_CMAPI_URL
class ClusterControllerClient:
def __init__(
self, base_url: str = CURRENT_NODE_CMAPI_URL,
request_timeout: Optional[float] = None
):
"""Initialize the ClusterControllerClient with the base URL.
:param base_url: The base URL for the API endpoints,
defaults to CURRENT_NODE_CMAPI_URL
:type base_url: str, optional
:param request_timeout: request timeout, defaults to None
:type request_timeout: Optional[float], optional
"""
self.base_url = base_url
self.request_timeout = request_timeout
def start_cluster(
self, data: Optional[Dict[str, Any]] = None
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Start the cluster.
:return: The response from the API.
"""
return self._request('PUT', 'start', data)
def shutdown_cluster(
self, data: Optional[Dict[str, Any]] = None
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Shutdown the cluster.
:return: The response from the API.
"""
return self._request('PUT', 'shutdown', data)
def set_mode(self, mode: str) -> Union[Dict[str, Any], Dict[str, str]]:
"""Set the cluster mode.
:param mode: The mode to set.
:return: The response from the API.
"""
return self._request('PUT', 'mode-set', {'mode': mode})
def add_node(
self, node_info: Dict[str, Any]
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Add a node to the cluster.
:param node_info: Information about the node to add.
:return: The response from the API.
"""
return self._request('PUT', 'node', node_info)
def remove_node(
self, node_id: str
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Remove a node from the cluster.
:param node_id: The ID of the node to remove.
:return: The response from the API.
"""
return self._request('DELETE', 'node', {'node_id': node_id})
def get_status(self) -> Union[Dict[str, Any], Dict[str, str]]:
"""Get the status of the cluster.
:return: The response from the API.
"""
return self._request('GET', 'status')
def set_api_key(
self, api_key: str
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Set the API key for the cluster.
:param api_key: The API key to set.
:return: The response from the API.
"""
return self._request('put', 'apikey-set', {'api_key': api_key})
def set_log_level(
self, log_level: str
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Set the log level for the cluster.
:param log_level: The log level to set.
:return: The response from the API.
"""
return self._request('put', 'log-level', {'log_level': log_level})
def load_s3data(
self, s3data_info: Dict[str, Any]
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Load S3 data into the cluster.
:param s3data_info: Information about the S3 data to load.
:return: The response from the API.
"""
return self._request('put', 'load_s3data', s3data_info)
def _request(
self, method: str, endpoint: str,
data: Optional[Dict[str, Any]] = None
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Make a request to the API.
:param method: The HTTP method to use.
:param endpoint: The API endpoint to call.
:param data: The data to send with the request.
:return: The response from the API.
"""
url = f'{self.base_url}/cmapi/{_version}/cluster/{endpoint}'
try:
response = requests.request(
method, url, json=data, timeout=self.request_timeout
)
response.raise_for_status()
return response.json()
# TODO: different handler for timeout exception?
except requests.exceptions.RequestException as e:
return {'error': str(e)}

View File

@ -797,9 +797,10 @@ class ClusterController:
request = cherrypy.request request = cherrypy.request
request_body = request.json request_body = request.json
config = request_body.get('config', DEFAULT_MCS_CONF_PATH) config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
in_transaction = request_body.get('in_transaction', False)
try: try:
response = ClusterHandler.start(config) response = ClusterHandler.start(config, in_transaction)
except CMAPIBasicError as err: except CMAPIBasicError as err:
raise_422_error(module_logger, func_name, err.message) raise_422_error(module_logger, func_name, err.message)
@ -817,9 +818,10 @@ class ClusterController:
request = cherrypy.request request = cherrypy.request
request_body = request.json request_body = request.json
config = request_body.get('config', DEFAULT_MCS_CONF_PATH) config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
in_transaction = request_body.get('in_transaction', False)
try: try:
response = ClusterHandler.shutdown(config) response = ClusterHandler.shutdown(config, in_transaction)
except CMAPIBasicError as err: except CMAPIBasicError as err:
raise_422_error(module_logger, func_name, err.message) raise_422_error(module_logger, func_name, err.message)

View File

@ -1,6 +1,7 @@
"""Module contains Cluster business logic functions.""" """Module contains Cluster business logic functions."""
import logging import logging
from datetime import datetime from datetime import datetime
from enum import Enum
from typing import Optional from typing import Optional
import requests import requests
@ -22,26 +23,61 @@ from mcs_node_control.models.misc import get_dbrm_master
from mcs_node_control.models.node_config import NodeConfig from mcs_node_control.models.node_config import NodeConfig
class ClusterAction(Enum):
START = 'start'
STOP = 'stop'
def toggle_cluster_state(action: ClusterAction, config: str) -> dict:
"""
Toggle the state of the cluster (start or stop).
:param action: The cluster action to perform.
(ClusterAction.START or ClusterAction.STOP).
:type action: ClusterAction
:param config: The path to the MariaDB Columnstore configuration file.
:type config: str
"""
if action == ClusterAction.START:
maintainance_flag = True
elif action == ClusterAction.STOP:
maintainance_flag = True
else:
raise ValueError(
'Invalid action. Use ClusterAction.START or ClusterAction.STOP.'
)
switch_node_maintenance(maintainance_flag)
update_revision_and_manager()
# TODO: move this from multiple places to one
try:
broadcast_successful = broadcast_new_config(config)
except Exception as err:
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
raise CMAPIBasicError('Config distribution isn\'t successful.')
class ClusterHandler(): class ClusterHandler():
"""Class for handling MCS Cluster operations.""" """Class for handling MCS Cluster operations."""
@staticmethod @staticmethod
def status( def status(config: str = DEFAULT_MCS_CONF_PATH) -> dict:
config: str = DEFAULT_MCS_CONF_PATH,
logger: logging.Logger = logging.getLogger('cmapi_server')
) -> dict:
"""Method to get MCS Cluster status information """Method to get MCS Cluster status information
:param config: columnstore xml config file path, :param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional :type config: str, optional
:param logger: logger, defaults to logging.getLogger('cmapi_server')
:type logger: logging.Logger, optional
:raises CMAPIBasicError: if catch some exception while getting status :raises CMAPIBasicError: if catch some exception while getting status
from each node separately from each node separately
:return: status result :return: status result
:rtype: dict :rtype: dict
""" """
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug('Cluster status command called. Getting status.') logger.debug('Cluster status command called. Getting status.')
response = {'timestamp': str(datetime.now())} response = {'timestamp': str(datetime.now())}
@ -73,78 +109,36 @@ class ClusterHandler():
@staticmethod @staticmethod
def start( def start(
config: str = DEFAULT_MCS_CONF_PATH, config: str = DEFAULT_MCS_CONF_PATH, in_transaction: bool = False
logger: logging.Logger = logging.getLogger('cmapi_server')
) -> dict: ) -> dict:
"""Method to start MCS Cluster. """Method to start MCS Cluster.
:param config: columnstore xml config file path, :param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional :type config: str, optional
:param logger: logger, defaults to logging.getLogger('cmapi_server') :param in_transaction: is function called in existing transaction or no
:type logger: logging.Logger, optional If we started transaction in cli tool than we
:raises CMAPIBasicError: on exception while starting transaction don't need to handle it here again
:raises CMAPIBasicError: if transaction start isn't successful :type in_transaction: bool
:raises CMAPIBasicError: if no nodes in the cluster :raises CMAPIBasicError: if no nodes in the cluster
:raises CMAPIBasicError: on exception while distributing new config
:raises CMAPIBasicError: on unsuccessful distibuting config file
:raises CMAPIBasicError: on exception while committing transaction
:return: start timestamp :return: start timestamp
:rtype: dict :rtype: dict
""" """
logger.debug('Cluster start command called. Starting the cluster.') logger: logging.Logger = logging.getLogger('cmapi_server')
start_time = str(datetime.now()) logger.info('Cluster start command called. Starting the cluster.')
transaction_id = get_id() operation_start_time = str(datetime.now())
if not in_transaction:
with TransactionManager():
toggle_cluster_state(ClusterAction.START, config)
else:
toggle_cluster_state(ClusterAction.START, config)
try: logger.info('Successfully finished cluster start.')
suceeded, transaction_id, successes = start_transaction( return {'timestamp': operation_start_time}
cs_config_filename=config, txn_id=transaction_id
)
except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError(
'Error while starting the transaction.'
) from err
if not suceeded:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError('Starting transaction isn\'t successful.')
if suceeded and len(successes) == 0:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError('There are no nodes in the cluster.')
switch_node_maintenance(False)
update_revision_and_manager()
# TODO: move this from multiple places to one, eg to helpers
try:
broadcast_successful = broadcast_new_config(config)
except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError('Config distribution isn\'t successful.')
try:
commit_transaction(transaction_id, cs_config_filename=config)
except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError(
'Error while committing transaction.'
) from err
logger.debug('Successfully finished cluster start.')
return {'timestamp': start_time}
@staticmethod @staticmethod
def shutdown( def shutdown(
config: str = DEFAULT_MCS_CONF_PATH, config: str = DEFAULT_MCS_CONF_PATH, in_transaction: bool = False,
logger: logging.Logger = logging.getLogger('cmapi_server'),
in_transaction: bool = False,
timeout: int = 15 timeout: int = 15
) -> dict: ) -> dict:
"""Method to stop the MCS Cluster. """Method to stop the MCS Cluster.
@ -152,8 +146,6 @@ class ClusterHandler():
:param config: columnstore xml config file path, :param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional :type config: str, optional
:param logger: logger, defaults to logging.getLogger('cmapi_server')
:type logger: logging.Logger, optional
:param in_transaction: is function called in existing transaction or no :param in_transaction: is function called in existing transaction or no
:type in_transaction: bool :type in_transaction: bool
:param timeout: timeout in seconds to gracefully stop DMLProc :param timeout: timeout in seconds to gracefully stop DMLProc
@ -163,35 +155,19 @@ class ClusterHandler():
:return: start timestamp :return: start timestamp
:rtype: dict :rtype: dict
""" """
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug( logger.debug(
'Cluster shutdown command called. Shutting down the cluster.' 'Cluster shutdown command called. Shutting down the cluster.'
) )
operation_start_time = str(datetime.now())
def process_shutdown():
"""Raw node shutdown processing."""
switch_node_maintenance(True)
update_revision_and_manager()
# TODO: move this from multiple places to one, eg to helpers
try:
broadcast_successful = broadcast_new_config(config)
except Exception as err:
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
raise CMAPIBasicError('Config distribution isn\'t successful.')
start_time = str(datetime.now())
if not in_transaction: if not in_transaction:
with TransactionManager(): with TransactionManager():
process_shutdown() toggle_cluster_state(ClusterAction.STOP, config)
else: else:
process_shutdown() toggle_cluster_state(ClusterAction.STOP, config)
logger.debug('Successfully finished shutting down the cluster.') logger.debug('Successfully finished shutting down the cluster.')
return {'timestamp': start_time} return {'timestamp': operation_start_time}
@staticmethod @staticmethod
def add_node( def add_node(

View File

@ -314,6 +314,7 @@ def broadcast_new_config(
:rtype: bool :rtype: bool
""" """
# TODO: move this from multiple places to one, eg to helpers
cfg_parser = get_config_parser(cmapi_config_filename) cfg_parser = get_config_parser(cmapi_config_filename)
key = get_current_key(cfg_parser) key = get_current_key(cfg_parser)
version = get_version() version = get_version()

View File

@ -6,7 +6,7 @@ from signal import (
) )
from typing import Optional, Type from typing import Optional, Type
from cmapi_server.constants import DEFAULT_MCS_CONF_PATH from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, TRANSACTION_TIMEOUT
from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.helpers import ( from cmapi_server.helpers import (
get_id, commit_transaction, rollback_transaction, start_transaction get_id, commit_transaction, rollback_transaction, start_transaction
@ -17,7 +17,7 @@ class TransactionManager(ContextDecorator):
"""Context manager and decorator to put any code inside CMAPI transaction. """Context manager and decorator to put any code inside CMAPI transaction.
:param timeout: time in sec after transaction will be autocommitted, :param timeout: time in sec after transaction will be autocommitted,
defaults to 300.0 defaults to 300.0 (TRANSACTION_TIMEOUT)
:param timeout: _description_, defaults to 300 :param timeout: _description_, defaults to 300
:type timeout: float, optional :type timeout: float, optional
@ -28,8 +28,8 @@ class TransactionManager(ContextDecorator):
""" """
def __init__( def __init__(
self, timeout: float = 300, txn_id: Optional[int] = None, self, timeout: float = TRANSACTION_TIMEOUT,
handle_signals: bool = False txn_id: Optional[int] = None, handle_signals: bool = False
): ):
self.timeout = timeout self.timeout = timeout
self.txn_id = txn_id or get_id() self.txn_id = txn_id or get_id()

View File

@ -13,16 +13,16 @@ import typer
from typing_extensions import Annotated from typing_extensions import Annotated
from cmapi_server.constants import ( from cmapi_server.constants import (
CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH, SECRET_KEY CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH, REQUEST_TIMEOUT
) )
from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.handlers.cluster import ClusterHandler
from cmapi_server.helpers import ( from cmapi_server.helpers import (
get_config_parser, get_current_key, get_version, build_url get_config_parser, get_current_key, get_version, build_url
) )
from cmapi_server.managers.transaction import TransactionManager from cmapi_server.managers.transaction import TransactionManager
from mcs_cluster_tool.decorators import handle_output from mcs_cluster_tool.decorators import handle_output
from mcs_node_control.models.node_config import NodeConfig from mcs_node_control.models.node_config import NodeConfig
from cmapi.cmapi_server.controllers.api_clients import ClusterControllerClient
logger = logging.getLogger('mcs_cli') logger = logging.getLogger('mcs_cli')
@ -33,13 +33,15 @@ node_app = typer.Typer(help='Cluster nodes management.')
app.add_typer(node_app, name='node') app.add_typer(node_app, name='node')
set_app = typer.Typer(help='Set cluster parameters.') set_app = typer.Typer(help='Set cluster parameters.')
app.add_typer(set_app, name='set') app.add_typer(set_app, name='set')
client = ClusterControllerClient()
@app.command(rich_help_panel='cluster and single node commands') @app.command(rich_help_panel='cluster and single node commands')
@handle_output @handle_output
def status(): def status():
"""Get status information.""" """Get status information."""
return ClusterHandler.status(logger=logger) client.request_timeout = REQUEST_TIMEOUT
return client.get_status()
@app.command(rich_help_panel='cluster and single node commands') @app.command(rich_help_panel='cluster and single node commands')
@ -157,25 +159,29 @@ def stop(
# TODO: investigate more on how changing the hardcoded timeout # TODO: investigate more on how changing the hardcoded timeout
# could affect put_config (helpers.py broadcast_config) operation # could affect put_config (helpers.py broadcast_config) operation
timeout = 0 timeout = 0
_ = ClusterHandler.shutdown(logger=logger, in_transaction=True)
resp = client.shutdown_cluster({'in_transaction': True})
return {'timestamp': start_time} return {'timestamp': start_time}
@app.command(rich_help_panel='cluster and single node commands') @app.command(rich_help_panel='cluster and single node commands')
@handle_output @handle_output
@TransactionManager(
timeout=timedelta(days=1).total_seconds(), handle_signals=True
)
def start(): def start():
"""Start the Columnstore cluster.""" """Start the Columnstore cluster."""
return ClusterHandler.start(logger=logger) return client.start_cluster({'in_transaction': True})
@app.command(rich_help_panel='cluster and single node commands') @app.command(rich_help_panel='cluster and single node commands')
@handle_output @handle_output
def restart(): def restart():
"""Restart the Columnstore cluster.""" """Restart the Columnstore cluster."""
stop_result = ClusterHandler.shutdown(logger=logger) stop_result = client.shutdown_cluster()
if 'error' in stop_result: if 'error' in stop_result:
return stop_result return stop_result
result = ClusterHandler.start(logger=logger) result = client.start_cluster()
result['stop_timestamp'] = stop_result['timestamp'] result['stop_timestamp'] = stop_result['timestamp']
return result return result
@ -195,7 +201,7 @@ def add(
"""Add nodes to the Columnstore cluster.""" """Add nodes to the Columnstore cluster."""
result = [] result = []
for node in nodes: for node in nodes:
result.append(ClusterHandler.add_node(node, logger=logger)) result.append(client.add_node({'node': node}))
return result return result
@ -213,7 +219,7 @@ def remove(nodes: Optional[List[str]] = typer.Option(
"""Remove nodes from the Columnstore cluster.""" """Remove nodes from the Columnstore cluster."""
result = [] result = []
for node in nodes: for node in nodes:
result.append(ClusterHandler.remove_node(node, logger=logger)) result.append(client.remove_node(node))
return result return result
@ -233,7 +239,8 @@ def mode(cluster_mode: str = typer.Option(
raise typer.BadParameter( raise typer.BadParameter(
'"readonly" or "readwrite" are the only acceptable modes now.' '"readonly" or "readwrite" are the only acceptable modes now.'
) )
return ClusterHandler.set_mode(cluster_mode, logger=logger) client.request_timeout = REQUEST_TIMEOUT
return client.set_mode(cluster_mode)
@set_app.command() @set_app.command()
@ -245,10 +252,8 @@ def api_key(key: str = typer.Option(..., help='API key to set.')):
""" """
if not key: if not key:
raise typer.BadParameter('Empty API key not allowed.') raise typer.BadParameter('Empty API key not allowed.')
client.request_timeout = REQUEST_TIMEOUT
totp = pyotp.TOTP(SECRET_KEY) return client.set_api_key(key)
return ClusterHandler.set_api_key(key, totp.now(), logger=logger)
@set_app.command() @set_app.command()
@ -260,5 +265,5 @@ def log_level(level: str = typer.Option(..., help='Logging level to set.')):
""" """
if not level: if not level:
raise typer.BadParameter('Empty log level not allowed.') raise typer.BadParameter('Empty log level not allowed.')
client.request_timeout = REQUEST_TIMEOUT
return ClusterHandler.set_log_level(level, logger=logger) return client.set_log_level(level)