1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(cmapi): MCOL-5133: Stage3 stand alone cli tool.

MAJOR: Some logic inside node remove changed significantly using active
    nodes list from Columnstore.xml to broadcast config after remove.

 [fix] TransactionManager passsing  extra, remove and optional nodes arguments to start_transaction function
 [fix] commit and rollback methods of TransactionManager adding nodes argument
 [fix] TransactionManager using success_txn_nodes inside
 [fix] remove node logic to use Transaction manager
 [fix] cluster set api key call using totp on a top level cli call
 [add] missed docstrings
 [fix] cluster shutdown timeout for next release
This commit is contained in:
mariadb-AlanMologorsky
2025-01-31 15:43:35 +03:00
committed by Leonid Fedorov
parent c40e4ba00d
commit 27e3b8d808
8 changed files with 183 additions and 124 deletions

View File

@ -1,7 +1,13 @@
import requests
from typing import Any, Dict, Optional, Union
import pyotp
from cmapi_server.controllers.dispatcher import _version
from cmapi_server.constants import CURRENT_NODE_CMAPI_URL
from cmapi_server.constants import (
CMAPI_CONF_PATH, CURRENT_NODE_CMAPI_URL, SECRET_KEY,
)
from cmapi_server.helpers import get_config_parser, get_current_key
class ClusterControllerClient:
@ -11,6 +17,10 @@ class ClusterControllerClient:
):
"""Initialize the ClusterControllerClient with the base URL.
WARNING: This class only handles the API requests, it does not
handle the transaction management. So it should be started
at level above using TransactionManager (decorator or context manager).
:param base_url: The base URL for the API endpoints,
defaults to CURRENT_NODE_CMAPI_URL
:type base_url: str, optional
@ -59,14 +69,14 @@ class ClusterControllerClient:
return self._request('PUT', 'node', {**node_info, **extra})
def remove_node(
self, node_id: str, extra: Dict[str, Any] = dict()
self, node: str, extra: Dict[str, Any] = dict()
) -> Union[Dict[str, Any], Dict[str, str]]:
"""Remove a node from the cluster.
:param node_id: The ID of the node to remove.
:param node: node IP, name or FQDN.
:return: The response from the API.
"""
return self._request('DELETE', 'node', {'node_id': node_id})
return self._request('DELETE', 'node', {'node': node, **extra})
def get_status(self) -> Union[Dict[str, Any], Dict[str, str]]:
"""Get the status of the cluster.
@ -83,7 +93,12 @@ class ClusterControllerClient:
:param api_key: The API key to set.
:return: The response from the API.
"""
return self._request('put', 'apikey-set', {'api_key': api_key})
totp = pyotp.TOTP(SECRET_KEY)
payload = {
'api_key': api_key,
'verification_key': totp.now()
}
return self._request('put', 'apikey-set', payload)
def set_log_level(
self, log_level: str
@ -117,9 +132,16 @@ class ClusterControllerClient:
:return: The response from the API.
"""
url = f'{self.base_url}/cmapi/{_version}/cluster/{endpoint}'
cmapi_cfg_parser = get_config_parser(CMAPI_CONF_PATH)
key = get_current_key(cmapi_cfg_parser)
headers = {'x-api-key': key}
if method in ['PUT', 'POST', 'DELETE']:
headers['Content-Type'] = 'application/json'
data = {'in_transaction': True, **(data or {})}
try:
response = requests.request(
method, url, json=data, timeout=self.request_timeout
method, url, headers=headers, json=data,
timeout=self.request_timeout, verify=False
)
response.raise_for_status()
return response.json()

View File

@ -14,15 +14,16 @@ import requests
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.constants import (
DEFAULT_SM_CONF_PATH, EM_PATH_SUFFIX, DEFAULT_MCS_CONF_PATH, MCS_EM_PATH,
MCS_BRM_CURRENT_PATH, S3_BRM_CURRENT_PATH, CMAPI_CONF_PATH, SECRET_KEY,
DEFAULT_MCS_CONF_PATH, DEFAULT_SM_CONF_PATH, EM_PATH_SUFFIX,
MCS_BRM_CURRENT_PATH, MCS_EM_PATH, S3_BRM_CURRENT_PATH, SECRET_KEY,
)
from cmapi_server.controllers.error import APIError
from cmapi_server.handlers.cej import CEJError
from cmapi_server.handlers.cluster import ClusterHandler
from cmapi_server.helpers import (
cmapi_config_check, get_config_parser, get_current_key, get_dbroots,
system_ready, save_cmapi_conf_file, dequote, in_maintenance_state,
cmapi_config_check, dequote, get_active_nodes, get_config_parser,
get_current_key, get_dbroots, in_maintenance_state, save_cmapi_conf_file,
system_ready,
)
from cmapi_server.logging_management import change_loggers_level
from cmapi_server.managers.application import AppManager
@ -60,6 +61,9 @@ def raise_422_error(
:type exc_info: bool
:raises APIError: everytime with custom error message
"""
# TODO: change:
# - func name to inspect.stack(0)[1][3]
# - make something to logger, seems passing here is useless
logger.error(f'{func_name} {err_msg}', exc_info=exc_info)
raise APIError(422, err_msg)
@ -146,7 +150,21 @@ def active_operation():
if txn_section is not None:
txn_manager_address = app.config['txn'].get('manager_address', None)
if txn_manager_address is not None and len(txn_manager_address) > 0:
raise APIError(422, "There is an active operation.")
raise_422_error(
module_logger, 'active_operation', 'There is an active operation.'
)
@cherrypy.tools.register('before_handler', priority=82)
def has_active_nodes():
"""Check if there are any active nodes in the cluster."""
active_nodes = get_active_nodes()
if len(active_nodes) == 0:
raise_422_error(
module_logger, 'has_active_nodes',
'No active nodes in the cluster.'
)
class TimingTool(cherrypy.Tool):
@ -816,19 +834,22 @@ class ClusterController:
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.validate_api_key() # pylint: disable=no-member
@cherrypy.tools.has_active_nodes() # pylint: disable=no-member
def put_shutdown(self):
func_name = 'put_shutdown'
log_begin(module_logger, func_name)
request = cherrypy.request
request_body = request.json
timeout = request_body.get('timeout', None)
force = request_body.get('force', False)
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
in_transaction = request_body.get('in_transaction', False)
try:
if not in_transaction:
with TransactionManager():
response = ClusterHandler.shutdown(config)
response = ClusterHandler.shutdown(config, timeout)
else:
response = ClusterHandler.shutdown(config)
except CMAPIBasicError as err:
@ -882,7 +903,7 @@ class ClusterController:
try:
if not in_transaction:
with TransactionManager():
with TransactionManager(extra_nodes=[node]):
response = ClusterHandler.add_node(node, config)
else:
response = ClusterHandler.add_node(node, config)
@ -903,7 +924,6 @@ class ClusterController:
request_body = request.json
node = request_body.get('node', None)
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
#TODO: for next release
in_transaction = request_body.get('in_transaction', False)
#TODO: add arguments verification decorator
@ -911,7 +931,11 @@ class ClusterController:
raise_422_error(module_logger, func_name, 'missing node argument')
try:
response = ClusterHandler.remove_node(node, config)
if not in_transaction:
with TransactionManager(remove_nodes=[node]):
response = ClusterHandler.remove_node(node, config)
else:
response = ClusterHandler.remove_node(node, config)
except CMAPIBasicError as err:
raise_422_error(module_logger, func_name, err.message)
@ -1021,7 +1045,7 @@ class ClusterController:
if not totp_key or not new_api_key:
# not show which arguments in error message because endpoint for
# internal usage only
# cli tool or internal usage only
raise_422_error(
module_logger, func_name, 'Missing required arguments.'
)