1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00

feat(cmapi): MCOL-5133: Stage1 to stand alone cli tool. (#3378)

[add] cluster api client class
[fix] cli cluster_app using cluster api client
[add] ClusterAction Enum
[add] toggle_cluster_state function to reduce code duplication
This commit is contained in:
Alan Mologorsky 2025-03-12 14:21:32 +03:00 committed by GitHub
parent 36a412962d
commit dafe35ef49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 226 additions and 109 deletions

View File

@ -82,3 +82,10 @@ MCS_INSTALL_BIN = '/usr/bin'
IFLAG = os.path.join(MCS_ETC_PATH, 'container-initialized')
LIBJEMALLOC_DEFAULT_PATH = os.path.join(MCS_DATA_PATH, 'libjemalloc.so.2')
MCS_LOG_PATH = '/var/log/mariadb/columnstore'
# client constants
CMAPI_PORT = 8640 #TODO: use it in all places
CURRENT_NODE_CMAPI_URL = f'https://localhost:{CMAPI_PORT}'
REQUEST_TIMEOUT: float = 30.0
TRANSACTION_TIMEOUT: float = 300.0 # 5 minutes

View File

@ -0,0 +1,126 @@
import requests
from typing import Any, Dict, Optional, Union
from cmapi_server.controllers.dispatcher import _version
from cmapi_server.constants import CURRENT_NODE_CMAPI_URL
class ClusterControllerClient:
def __init__(
self, base_url: str = CURRENT_NODE_CMAPI_URL,
request_timeout: Optional[float] = None
):
"""Initialize the ClusterControllerClient with the base URL.
:param base_url: The base URL for the API endpoints,
defaults to CURRENT_NODE_CMAPI_URL
:type base_url: str, optional
:param request_timeout: request timeout, defaults to None
:type request_timeout: Optional[float], optional
"""
self.base_url = base_url
self.request_timeout = request_timeout
def start_cluster(
self, data: Optional[Dict[str, Any]] = None
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Start the cluster.
:return: The response from the API.
"""
return self._request('PUT', 'start', data)
def shutdown_cluster(
self, data: Optional[Dict[str, Any]] = None
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Shutdown the cluster.
:return: The response from the API.
"""
return self._request('PUT', 'shutdown', data)
def set_mode(self, mode: str) -> Union[Dict[str, Any], Dict[str, str]]:
"""Set the cluster mode.
:param mode: The mode to set.
:return: The response from the API.
"""
return self._request('PUT', 'mode-set', {'mode': mode})
def add_node(
self, node_info: Dict[str, Any]
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Add a node to the cluster.
:param node_info: Information about the node to add.
:return: The response from the API.
"""
return self._request('PUT', 'node', node_info)
def remove_node(
self, node_id: str
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Remove a node from the cluster.
:param node_id: The ID of the node to remove.
:return: The response from the API.
"""
return self._request('DELETE', 'node', {'node_id': node_id})
def get_status(self) -> Union[Dict[str, Any], Dict[str, str]]:
"""Get the status of the cluster.
:return: The response from the API.
"""
return self._request('GET', 'status')
def set_api_key(
self, api_key: str
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Set the API key for the cluster.
:param api_key: The API key to set.
:return: The response from the API.
"""
return self._request('put', 'apikey-set', {'api_key': api_key})
def set_log_level(
self, log_level: str
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Set the log level for the cluster.
:param log_level: The log level to set.
:return: The response from the API.
"""
return self._request('put', 'log-level', {'log_level': log_level})
def load_s3data(
self, s3data_info: Dict[str, Any]
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Load S3 data into the cluster.
:param s3data_info: Information about the S3 data to load.
:return: The response from the API.
"""
return self._request('put', 'load_s3data', s3data_info)
def _request(
self, method: str, endpoint: str,
data: Optional[Dict[str, Any]] = None
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Make a request to the API.
:param method: The HTTP method to use.
:param endpoint: The API endpoint to call.
:param data: The data to send with the request.
:return: The response from the API.
"""
url = f'{self.base_url}/cmapi/{_version}/cluster/{endpoint}'
try:
response = requests.request(
method, url, json=data, timeout=self.request_timeout
)
response.raise_for_status()
return response.json()
# TODO: different handler for timeout exception?
except requests.exceptions.RequestException as e:
return {'error': str(e)}

View File

@ -797,9 +797,10 @@ class ClusterController:
request = cherrypy.request
request_body = request.json
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
in_transaction = request_body.get('in_transaction', False)
try:
response = ClusterHandler.start(config)
response = ClusterHandler.start(config, in_transaction)
except CMAPIBasicError as err:
raise_422_error(module_logger, func_name, err.message)
@ -817,9 +818,10 @@ class ClusterController:
request = cherrypy.request
request_body = request.json
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
in_transaction = request_body.get('in_transaction', False)
try:
response = ClusterHandler.shutdown(config)
response = ClusterHandler.shutdown(config, in_transaction)
except CMAPIBasicError as err:
raise_422_error(module_logger, func_name, err.message)

View File

@ -1,6 +1,7 @@
"""Module contains Cluster business logic functions."""
import logging
from datetime import datetime
from enum import Enum
from typing import Optional
import requests
@ -22,26 +23,61 @@ from mcs_node_control.models.misc import get_dbrm_master
from mcs_node_control.models.node_config import NodeConfig
class ClusterAction(Enum):
START = 'start'
STOP = 'stop'
def toggle_cluster_state(action: ClusterAction, config: str) -> dict:
"""
Toggle the state of the cluster (start or stop).
:param action: The cluster action to perform.
(ClusterAction.START or ClusterAction.STOP).
:type action: ClusterAction
:param config: The path to the MariaDB Columnstore configuration file.
:type config: str
"""
if action == ClusterAction.START:
maintainance_flag = True
elif action == ClusterAction.STOP:
maintainance_flag = True
else:
raise ValueError(
'Invalid action. Use ClusterAction.START or ClusterAction.STOP.'
)
switch_node_maintenance(maintainance_flag)
update_revision_and_manager()
# TODO: move this from multiple places to one
try:
broadcast_successful = broadcast_new_config(config)
except Exception as err:
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
raise CMAPIBasicError('Config distribution isn\'t successful.')
class ClusterHandler():
"""Class for handling MCS Cluster operations."""
@staticmethod
def status(
config: str = DEFAULT_MCS_CONF_PATH,
logger: logging.Logger = logging.getLogger('cmapi_server')
) -> dict:
def status(config: str = DEFAULT_MCS_CONF_PATH) -> dict:
"""Method to get MCS Cluster status information
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param logger: logger, defaults to logging.getLogger('cmapi_server')
:type logger: logging.Logger, optional
:raises CMAPIBasicError: if catch some exception while getting status
from each node separately
:return: status result
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug('Cluster status command called. Getting status.')
response = {'timestamp': str(datetime.now())}
@ -73,78 +109,36 @@ class ClusterHandler():
@staticmethod
def start(
config: str = DEFAULT_MCS_CONF_PATH,
logger: logging.Logger = logging.getLogger('cmapi_server')
config: str = DEFAULT_MCS_CONF_PATH, in_transaction: bool = False
) -> dict:
"""Method to start MCS Cluster.
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param logger: logger, defaults to logging.getLogger('cmapi_server')
:type logger: logging.Logger, optional
:raises CMAPIBasicError: on exception while starting transaction
:raises CMAPIBasicError: if transaction start isn't successful
:param in_transaction: is function called in existing transaction or no
If we started transaction in cli tool than we
don't need to handle it here again
:type in_transaction: bool
:raises CMAPIBasicError: if no nodes in the cluster
:raises CMAPIBasicError: on exception while distributing new config
:raises CMAPIBasicError: on unsuccessful distibuting config file
:raises CMAPIBasicError: on exception while committing transaction
:return: start timestamp
:rtype: dict
"""
logger.debug('Cluster start command called. Starting the cluster.')
start_time = str(datetime.now())
transaction_id = get_id()
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.info('Cluster start command called. Starting the cluster.')
operation_start_time = str(datetime.now())
if not in_transaction:
with TransactionManager():
toggle_cluster_state(ClusterAction.START, config)
else:
toggle_cluster_state(ClusterAction.START, config)
try:
suceeded, transaction_id, successes = start_transaction(
cs_config_filename=config, txn_id=transaction_id
)
except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError(
'Error while starting the transaction.'
) from err
if not suceeded:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError('Starting transaction isn\'t successful.')
if suceeded and len(successes) == 0:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError('There are no nodes in the cluster.')
switch_node_maintenance(False)
update_revision_and_manager()
# TODO: move this from multiple places to one, eg to helpers
try:
broadcast_successful = broadcast_new_config(config)
except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError('Config distribution isn\'t successful.')
try:
commit_transaction(transaction_id, cs_config_filename=config)
except Exception as err:
rollback_transaction(transaction_id, cs_config_filename=config)
raise CMAPIBasicError(
'Error while committing transaction.'
) from err
logger.debug('Successfully finished cluster start.')
return {'timestamp': start_time}
logger.info('Successfully finished cluster start.')
return {'timestamp': operation_start_time}
@staticmethod
def shutdown(
config: str = DEFAULT_MCS_CONF_PATH,
logger: logging.Logger = logging.getLogger('cmapi_server'),
in_transaction: bool = False,
config: str = DEFAULT_MCS_CONF_PATH, in_transaction: bool = False,
timeout: int = 15
) -> dict:
"""Method to stop the MCS Cluster.
@ -152,8 +146,6 @@ class ClusterHandler():
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param logger: logger, defaults to logging.getLogger('cmapi_server')
:type logger: logging.Logger, optional
:param in_transaction: is function called in existing transaction or no
:type in_transaction: bool
:param timeout: timeout in seconds to gracefully stop DMLProc
@ -163,35 +155,19 @@ class ClusterHandler():
:return: start timestamp
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug(
'Cluster shutdown command called. Shutting down the cluster.'
)
def process_shutdown():
"""Raw node shutdown processing."""
switch_node_maintenance(True)
update_revision_and_manager()
# TODO: move this from multiple places to one, eg to helpers
try:
broadcast_successful = broadcast_new_config(config)
except Exception as err:
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
raise CMAPIBasicError('Config distribution isn\'t successful.')
start_time = str(datetime.now())
operation_start_time = str(datetime.now())
if not in_transaction:
with TransactionManager():
process_shutdown()
toggle_cluster_state(ClusterAction.STOP, config)
else:
process_shutdown()
toggle_cluster_state(ClusterAction.STOP, config)
logger.debug('Successfully finished shutting down the cluster.')
return {'timestamp': start_time}
return {'timestamp': operation_start_time}
@staticmethod
def add_node(

View File

@ -314,6 +314,7 @@ def broadcast_new_config(
:rtype: bool
"""
# TODO: move this from multiple places to one, eg to helpers
cfg_parser = get_config_parser(cmapi_config_filename)
key = get_current_key(cfg_parser)
version = get_version()

View File

@ -6,7 +6,7 @@ from signal import (
)
from typing import Optional, Type
from cmapi_server.constants import DEFAULT_MCS_CONF_PATH
from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, TRANSACTION_TIMEOUT
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.helpers import (
get_id, commit_transaction, rollback_transaction, start_transaction
@ -17,7 +17,7 @@ class TransactionManager(ContextDecorator):
"""Context manager and decorator to put any code inside CMAPI transaction.
:param timeout: time in sec after transaction will be autocommitted,
defaults to 300.0
defaults to 300.0 (TRANSACTION_TIMEOUT)
:param timeout: _description_, defaults to 300
:type timeout: float, optional
@ -28,8 +28,8 @@ class TransactionManager(ContextDecorator):
"""
def __init__(
self, timeout: float = 300, txn_id: Optional[int] = None,
handle_signals: bool = False
self, timeout: float = TRANSACTION_TIMEOUT,
txn_id: Optional[int] = None, handle_signals: bool = False
):
self.timeout = timeout
self.txn_id = txn_id or get_id()

View File

@ -13,16 +13,16 @@ import typer
from typing_extensions import Annotated
from cmapi_server.constants import (
CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH, SECRET_KEY
CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH, REQUEST_TIMEOUT
)
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.handlers.cluster import ClusterHandler
from cmapi_server.helpers import (
get_config_parser, get_current_key, get_version, build_url
)
from cmapi_server.managers.transaction import TransactionManager
from mcs_cluster_tool.decorators import handle_output
from mcs_node_control.models.node_config import NodeConfig
from cmapi.cmapi_server.controllers.api_clients import ClusterControllerClient
logger = logging.getLogger('mcs_cli')
@ -33,13 +33,15 @@ node_app = typer.Typer(help='Cluster nodes management.')
app.add_typer(node_app, name='node')
set_app = typer.Typer(help='Set cluster parameters.')
app.add_typer(set_app, name='set')
client = ClusterControllerClient()
@app.command(rich_help_panel='cluster and single node commands')
@handle_output
def status():
"""Get status information."""
return ClusterHandler.status(logger=logger)
client.request_timeout = REQUEST_TIMEOUT
return client.get_status()
@app.command(rich_help_panel='cluster and single node commands')
@ -157,25 +159,29 @@ def stop(
# TODO: investigate more on how changing the hardcoded timeout
# could affect put_config (helpers.py broadcast_config) operation
timeout = 0
_ = ClusterHandler.shutdown(logger=logger, in_transaction=True)
resp = client.shutdown_cluster({'in_transaction': True})
return {'timestamp': start_time}
@app.command(rich_help_panel='cluster and single node commands')
@handle_output
@TransactionManager(
timeout=timedelta(days=1).total_seconds(), handle_signals=True
)
def start():
"""Start the Columnstore cluster."""
return ClusterHandler.start(logger=logger)
return client.start_cluster({'in_transaction': True})
@app.command(rich_help_panel='cluster and single node commands')
@handle_output
def restart():
"""Restart the Columnstore cluster."""
stop_result = ClusterHandler.shutdown(logger=logger)
stop_result = client.shutdown_cluster()
if 'error' in stop_result:
return stop_result
result = ClusterHandler.start(logger=logger)
result = client.start_cluster()
result['stop_timestamp'] = stop_result['timestamp']
return result
@ -195,7 +201,7 @@ def add(
"""Add nodes to the Columnstore cluster."""
result = []
for node in nodes:
result.append(ClusterHandler.add_node(node, logger=logger))
result.append(client.add_node({'node': node}))
return result
@ -213,7 +219,7 @@ def remove(nodes: Optional[List[str]] = typer.Option(
"""Remove nodes from the Columnstore cluster."""
result = []
for node in nodes:
result.append(ClusterHandler.remove_node(node, logger=logger))
result.append(client.remove_node(node))
return result
@ -233,7 +239,8 @@ def mode(cluster_mode: str = typer.Option(
raise typer.BadParameter(
'"readonly" or "readwrite" are the only acceptable modes now.'
)
return ClusterHandler.set_mode(cluster_mode, logger=logger)
client.request_timeout = REQUEST_TIMEOUT
return client.set_mode(cluster_mode)
@set_app.command()
@ -245,10 +252,8 @@ def api_key(key: str = typer.Option(..., help='API key to set.')):
"""
if not key:
raise typer.BadParameter('Empty API key not allowed.')
totp = pyotp.TOTP(SECRET_KEY)
return ClusterHandler.set_api_key(key, totp.now(), logger=logger)
client.request_timeout = REQUEST_TIMEOUT
return client.set_api_key(key)
@set_app.command()
@ -260,5 +265,5 @@ def log_level(level: str = typer.Option(..., help='Logging level to set.')):
"""
if not level:
raise typer.BadParameter('Empty log level not allowed.')
return ClusterHandler.set_log_level(level, logger=logger)
client.request_timeout = REQUEST_TIMEOUT
return client.set_log_level(level)