You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
feat(cmapi): MCOL-5133: Stage3 stand alone cli tool.
MAJOR: Some logic inside node remove changed significantly using active nodes list from Columnstore.xml to broadcast config after remove. [fix] TransactionManager passsing extra, remove and optional nodes arguments to start_transaction function [fix] commit and rollback methods of TransactionManager adding nodes argument [fix] TransactionManager using success_txn_nodes inside [fix] remove node logic to use Transaction manager [fix] cluster set api key call using totp on a top level cli call [add] missed docstrings [fix] cluster shutdown timeout for next release
This commit is contained in:
committed by
Alan Mologorsky
parent
6a6db672db
commit
75ed571f09
@ -1,7 +1,13 @@
|
|||||||
import requests
|
import requests
|
||||||
from typing import Any, Dict, Optional, Union
|
from typing import Any, Dict, Optional, Union
|
||||||
|
|
||||||
|
import pyotp
|
||||||
|
|
||||||
from cmapi_server.controllers.dispatcher import _version
|
from cmapi_server.controllers.dispatcher import _version
|
||||||
from cmapi_server.constants import CURRENT_NODE_CMAPI_URL
|
from cmapi_server.constants import (
|
||||||
|
CMAPI_CONF_PATH, CURRENT_NODE_CMAPI_URL, SECRET_KEY,
|
||||||
|
)
|
||||||
|
from cmapi_server.helpers import get_config_parser, get_current_key
|
||||||
|
|
||||||
|
|
||||||
class ClusterControllerClient:
|
class ClusterControllerClient:
|
||||||
@ -11,6 +17,10 @@ class ClusterControllerClient:
|
|||||||
):
|
):
|
||||||
"""Initialize the ClusterControllerClient with the base URL.
|
"""Initialize the ClusterControllerClient with the base URL.
|
||||||
|
|
||||||
|
WARNING: This class only handles the API requests, it does not
|
||||||
|
handle the transaction management. So it should be started
|
||||||
|
at level above using TransactionManager (decorator or context manager).
|
||||||
|
|
||||||
:param base_url: The base URL for the API endpoints,
|
:param base_url: The base URL for the API endpoints,
|
||||||
defaults to CURRENT_NODE_CMAPI_URL
|
defaults to CURRENT_NODE_CMAPI_URL
|
||||||
:type base_url: str, optional
|
:type base_url: str, optional
|
||||||
@ -59,14 +69,14 @@ class ClusterControllerClient:
|
|||||||
return self._request('PUT', 'node', {**node_info, **extra})
|
return self._request('PUT', 'node', {**node_info, **extra})
|
||||||
|
|
||||||
def remove_node(
|
def remove_node(
|
||||||
self, node_id: str, extra: Dict[str, Any] = dict()
|
self, node: str, extra: Dict[str, Any] = dict()
|
||||||
) -> Union[Dict[str, Any], Dict[str, str]]:
|
) -> Union[Dict[str, Any], Dict[str, str]]:
|
||||||
"""Remove a node from the cluster.
|
"""Remove a node from the cluster.
|
||||||
|
|
||||||
:param node_id: The ID of the node to remove.
|
:param node: node IP, name or FQDN.
|
||||||
:return: The response from the API.
|
:return: The response from the API.
|
||||||
"""
|
"""
|
||||||
return self._request('DELETE', 'node', {'node_id': node_id})
|
return self._request('DELETE', 'node', {'node': node, **extra})
|
||||||
|
|
||||||
def get_status(self) -> Union[Dict[str, Any], Dict[str, str]]:
|
def get_status(self) -> Union[Dict[str, Any], Dict[str, str]]:
|
||||||
"""Get the status of the cluster.
|
"""Get the status of the cluster.
|
||||||
@ -83,7 +93,12 @@ class ClusterControllerClient:
|
|||||||
:param api_key: The API key to set.
|
:param api_key: The API key to set.
|
||||||
:return: The response from the API.
|
:return: The response from the API.
|
||||||
"""
|
"""
|
||||||
return self._request('put', 'apikey-set', {'api_key': api_key})
|
totp = pyotp.TOTP(SECRET_KEY)
|
||||||
|
payload = {
|
||||||
|
'api_key': api_key,
|
||||||
|
'verification_key': totp.now()
|
||||||
|
}
|
||||||
|
return self._request('put', 'apikey-set', payload)
|
||||||
|
|
||||||
def set_log_level(
|
def set_log_level(
|
||||||
self, log_level: str
|
self, log_level: str
|
||||||
@ -117,9 +132,16 @@ class ClusterControllerClient:
|
|||||||
:return: The response from the API.
|
:return: The response from the API.
|
||||||
"""
|
"""
|
||||||
url = f'{self.base_url}/cmapi/{_version}/cluster/{endpoint}'
|
url = f'{self.base_url}/cmapi/{_version}/cluster/{endpoint}'
|
||||||
|
cmapi_cfg_parser = get_config_parser(CMAPI_CONF_PATH)
|
||||||
|
key = get_current_key(cmapi_cfg_parser)
|
||||||
|
headers = {'x-api-key': key}
|
||||||
|
if method in ['PUT', 'POST', 'DELETE']:
|
||||||
|
headers['Content-Type'] = 'application/json'
|
||||||
|
data = {'in_transaction': True, **(data or {})}
|
||||||
try:
|
try:
|
||||||
response = requests.request(
|
response = requests.request(
|
||||||
method, url, json=data, timeout=self.request_timeout
|
method, url, headers=headers, json=data,
|
||||||
|
timeout=self.request_timeout, verify=False
|
||||||
)
|
)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response.json()
|
return response.json()
|
||||||
|
@ -14,15 +14,16 @@ import requests
|
|||||||
|
|
||||||
from cmapi_server.exceptions import CMAPIBasicError
|
from cmapi_server.exceptions import CMAPIBasicError
|
||||||
from cmapi_server.constants import (
|
from cmapi_server.constants import (
|
||||||
DEFAULT_SM_CONF_PATH, EM_PATH_SUFFIX, DEFAULT_MCS_CONF_PATH, MCS_EM_PATH,
|
DEFAULT_MCS_CONF_PATH, DEFAULT_SM_CONF_PATH, EM_PATH_SUFFIX,
|
||||||
MCS_BRM_CURRENT_PATH, S3_BRM_CURRENT_PATH, CMAPI_CONF_PATH, SECRET_KEY,
|
MCS_BRM_CURRENT_PATH, MCS_EM_PATH, S3_BRM_CURRENT_PATH, SECRET_KEY,
|
||||||
)
|
)
|
||||||
from cmapi_server.controllers.error import APIError
|
from cmapi_server.controllers.error import APIError
|
||||||
from cmapi_server.handlers.cej import CEJError
|
from cmapi_server.handlers.cej import CEJError
|
||||||
from cmapi_server.handlers.cluster import ClusterHandler
|
from cmapi_server.handlers.cluster import ClusterHandler
|
||||||
from cmapi_server.helpers import (
|
from cmapi_server.helpers import (
|
||||||
cmapi_config_check, get_config_parser, get_current_key, get_dbroots,
|
cmapi_config_check, dequote, get_active_nodes, get_config_parser,
|
||||||
system_ready, save_cmapi_conf_file, dequote, in_maintenance_state,
|
get_current_key, get_dbroots, in_maintenance_state, save_cmapi_conf_file,
|
||||||
|
system_ready,
|
||||||
)
|
)
|
||||||
from cmapi_server.logging_management import change_loggers_level
|
from cmapi_server.logging_management import change_loggers_level
|
||||||
from cmapi_server.managers.application import AppManager
|
from cmapi_server.managers.application import AppManager
|
||||||
@ -60,6 +61,9 @@ def raise_422_error(
|
|||||||
:type exc_info: bool
|
:type exc_info: bool
|
||||||
:raises APIError: everytime with custom error message
|
:raises APIError: everytime with custom error message
|
||||||
"""
|
"""
|
||||||
|
# TODO: change:
|
||||||
|
# - func name to inspect.stack(0)[1][3]
|
||||||
|
# - make something to logger, seems passing here is useless
|
||||||
logger.error(f'{func_name} {err_msg}', exc_info=exc_info)
|
logger.error(f'{func_name} {err_msg}', exc_info=exc_info)
|
||||||
raise APIError(422, err_msg)
|
raise APIError(422, err_msg)
|
||||||
|
|
||||||
@ -146,7 +150,21 @@ def active_operation():
|
|||||||
if txn_section is not None:
|
if txn_section is not None:
|
||||||
txn_manager_address = app.config['txn'].get('manager_address', None)
|
txn_manager_address = app.config['txn'].get('manager_address', None)
|
||||||
if txn_manager_address is not None and len(txn_manager_address) > 0:
|
if txn_manager_address is not None and len(txn_manager_address) > 0:
|
||||||
raise APIError(422, "There is an active operation.")
|
raise_422_error(
|
||||||
|
module_logger, 'active_operation', 'There is an active operation.'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@cherrypy.tools.register('before_handler', priority=82)
|
||||||
|
def has_active_nodes():
|
||||||
|
"""Check if there are any active nodes in the cluster."""
|
||||||
|
active_nodes = get_active_nodes()
|
||||||
|
|
||||||
|
if len(active_nodes) == 0:
|
||||||
|
raise_422_error(
|
||||||
|
module_logger, 'has_active_nodes',
|
||||||
|
'No active nodes in the cluster.'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TimingTool(cherrypy.Tool):
|
class TimingTool(cherrypy.Tool):
|
||||||
@ -816,19 +834,22 @@ class ClusterController:
|
|||||||
@cherrypy.tools.json_in()
|
@cherrypy.tools.json_in()
|
||||||
@cherrypy.tools.json_out()
|
@cherrypy.tools.json_out()
|
||||||
@cherrypy.tools.validate_api_key() # pylint: disable=no-member
|
@cherrypy.tools.validate_api_key() # pylint: disable=no-member
|
||||||
|
@cherrypy.tools.has_active_nodes() # pylint: disable=no-member
|
||||||
def put_shutdown(self):
|
def put_shutdown(self):
|
||||||
func_name = 'put_shutdown'
|
func_name = 'put_shutdown'
|
||||||
log_begin(module_logger, func_name)
|
log_begin(module_logger, func_name)
|
||||||
|
|
||||||
request = cherrypy.request
|
request = cherrypy.request
|
||||||
request_body = request.json
|
request_body = request.json
|
||||||
|
timeout = request_body.get('timeout', None)
|
||||||
|
force = request_body.get('force', False)
|
||||||
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
|
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
|
||||||
in_transaction = request_body.get('in_transaction', False)
|
in_transaction = request_body.get('in_transaction', False)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not in_transaction:
|
if not in_transaction:
|
||||||
with TransactionManager():
|
with TransactionManager():
|
||||||
response = ClusterHandler.shutdown(config)
|
response = ClusterHandler.shutdown(config, timeout)
|
||||||
else:
|
else:
|
||||||
response = ClusterHandler.shutdown(config)
|
response = ClusterHandler.shutdown(config)
|
||||||
except CMAPIBasicError as err:
|
except CMAPIBasicError as err:
|
||||||
@ -882,7 +903,7 @@ class ClusterController:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
if not in_transaction:
|
if not in_transaction:
|
||||||
with TransactionManager():
|
with TransactionManager(extra_nodes=[node]):
|
||||||
response = ClusterHandler.add_node(node, config)
|
response = ClusterHandler.add_node(node, config)
|
||||||
else:
|
else:
|
||||||
response = ClusterHandler.add_node(node, config)
|
response = ClusterHandler.add_node(node, config)
|
||||||
@ -903,7 +924,6 @@ class ClusterController:
|
|||||||
request_body = request.json
|
request_body = request.json
|
||||||
node = request_body.get('node', None)
|
node = request_body.get('node', None)
|
||||||
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
|
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
|
||||||
#TODO: for next release
|
|
||||||
in_transaction = request_body.get('in_transaction', False)
|
in_transaction = request_body.get('in_transaction', False)
|
||||||
|
|
||||||
#TODO: add arguments verification decorator
|
#TODO: add arguments verification decorator
|
||||||
@ -911,7 +931,11 @@ class ClusterController:
|
|||||||
raise_422_error(module_logger, func_name, 'missing node argument')
|
raise_422_error(module_logger, func_name, 'missing node argument')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = ClusterHandler.remove_node(node, config)
|
if not in_transaction:
|
||||||
|
with TransactionManager(remove_nodes=[node]):
|
||||||
|
response = ClusterHandler.remove_node(node, config)
|
||||||
|
else:
|
||||||
|
response = ClusterHandler.remove_node(node, config)
|
||||||
except CMAPIBasicError as err:
|
except CMAPIBasicError as err:
|
||||||
raise_422_error(module_logger, func_name, err.message)
|
raise_422_error(module_logger, func_name, err.message)
|
||||||
|
|
||||||
@ -1021,7 +1045,7 @@ class ClusterController:
|
|||||||
|
|
||||||
if not totp_key or not new_api_key:
|
if not totp_key or not new_api_key:
|
||||||
# not show which arguments in error message because endpoint for
|
# not show which arguments in error message because endpoint for
|
||||||
# internal usage only
|
# cli tool or internal usage only
|
||||||
raise_422_error(
|
raise_422_error(
|
||||||
module_logger, func_name, 'Missing required arguments.'
|
module_logger, func_name, 'Missing required arguments.'
|
||||||
)
|
)
|
||||||
|
@ -11,11 +11,9 @@ from cmapi_server.constants import (
|
|||||||
)
|
)
|
||||||
from cmapi_server.exceptions import CMAPIBasicError
|
from cmapi_server.exceptions import CMAPIBasicError
|
||||||
from cmapi_server.helpers import (
|
from cmapi_server.helpers import (
|
||||||
broadcast_new_config, commit_transaction, get_active_nodes, get_dbroots,
|
broadcast_new_config, get_active_nodes, get_dbroots, get_config_parser,
|
||||||
get_config_parser, get_current_key, get_id, get_version, start_transaction,
|
get_current_key, get_version, update_revision_and_manager,
|
||||||
rollback_transaction, update_revision_and_manager,
|
|
||||||
)
|
)
|
||||||
from cmapi_server.managers.transaction import TransactionManager
|
|
||||||
from cmapi_server.node_manipulation import (
|
from cmapi_server.node_manipulation import (
|
||||||
add_node, add_dbroot, remove_node, switch_node_maintenance,
|
add_node, add_dbroot, remove_node, switch_node_maintenance,
|
||||||
)
|
)
|
||||||
@ -28,9 +26,9 @@ class ClusterAction(Enum):
|
|||||||
STOP = 'stop'
|
STOP = 'stop'
|
||||||
|
|
||||||
|
|
||||||
def toggle_cluster_state(action: ClusterAction, config: str) -> dict:
|
def toggle_cluster_state(
|
||||||
"""
|
action: ClusterAction, config: str) -> dict:
|
||||||
Toggle the state of the cluster (start or stop).
|
"""Toggle the state of the cluster (start or stop).
|
||||||
|
|
||||||
:param action: The cluster action to perform.
|
:param action: The cluster action to perform.
|
||||||
(ClusterAction.START or ClusterAction.STOP).
|
(ClusterAction.START or ClusterAction.STOP).
|
||||||
@ -127,16 +125,16 @@ class ClusterHandler():
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def shutdown(
|
def shutdown(
|
||||||
config: str = DEFAULT_MCS_CONF_PATH, timeout: int = 15
|
config: str = DEFAULT_MCS_CONF_PATH, timeout: Optional[int] = None
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Method to stop the MCS Cluster.
|
"""Method to stop the MCS Cluster.
|
||||||
|
|
||||||
:param config: columnstore xml config file path,
|
:param config: columnstore xml config file path,
|
||||||
defaults to DEFAULT_MCS_CONF_PATH
|
defaults to DEFAULT_MCS_CONF_PATH
|
||||||
:type config: str, optional
|
:type config: str, optional
|
||||||
:param timeout: timeout in seconds to gracefully stop DMLProc
|
:param timeout: timeout in seconds to gracefully stop DMLProc,
|
||||||
TODO: for next releases
|
defaults to None
|
||||||
:type timeout: int
|
:type timeout: Optional[int], optional
|
||||||
:raises CMAPIBasicError: if no nodes in the cluster
|
:raises CMAPIBasicError: if no nodes in the cluster
|
||||||
:return: start timestamp
|
:return: start timestamp
|
||||||
:rtype: dict
|
:rtype: dict
|
||||||
@ -229,21 +227,6 @@ class ClusterHandler():
|
|||||||
f'Cluster remove node command called. Removing node {node}.'
|
f'Cluster remove node command called. Removing node {node}.'
|
||||||
)
|
)
|
||||||
response = {'timestamp': str(datetime.now())}
|
response = {'timestamp': str(datetime.now())}
|
||||||
transaction_id = get_id()
|
|
||||||
|
|
||||||
try:
|
|
||||||
suceeded, transaction_id, txn_nodes = start_transaction(
|
|
||||||
cs_config_filename=config, remove_nodes=[node],
|
|
||||||
txn_id=transaction_id
|
|
||||||
)
|
|
||||||
except Exception as err:
|
|
||||||
rollback_transaction(transaction_id, cs_config_filename=config)
|
|
||||||
raise CMAPIBasicError(
|
|
||||||
'Error while starting the transaction.'
|
|
||||||
) from err
|
|
||||||
if not suceeded:
|
|
||||||
rollback_transaction(transaction_id, cs_config_filename=config)
|
|
||||||
raise CMAPIBasicError('Starting transaction isn\'t successful.')
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
remove_node(
|
remove_node(
|
||||||
@ -251,50 +234,31 @@ class ClusterHandler():
|
|||||||
output_config_filename=config
|
output_config_filename=config
|
||||||
)
|
)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
rollback_transaction(
|
|
||||||
transaction_id, nodes=txn_nodes, cs_config_filename=config
|
|
||||||
)
|
|
||||||
raise CMAPIBasicError('Error while removing node.') from err
|
raise CMAPIBasicError('Error while removing node.') from err
|
||||||
|
|
||||||
response['node_id'] = node
|
response['node_id'] = node
|
||||||
if len(txn_nodes) > 0:
|
active_nodes = get_active_nodes(config)
|
||||||
|
if len(active_nodes) > 0:
|
||||||
update_revision_and_manager(
|
update_revision_and_manager(
|
||||||
input_config_filename=config, output_config_filename=config
|
input_config_filename=config, output_config_filename=config
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
broadcast_successful = broadcast_new_config(
|
broadcast_successful = broadcast_new_config(
|
||||||
config, nodes=txn_nodes
|
config, nodes=active_nodes
|
||||||
)
|
)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
rollback_transaction(
|
|
||||||
transaction_id, nodes=txn_nodes, cs_config_filename=config
|
|
||||||
)
|
|
||||||
raise CMAPIBasicError(
|
raise CMAPIBasicError(
|
||||||
'Error while distributing config file.'
|
'Error while distributing config file.'
|
||||||
) from err
|
) from err
|
||||||
if not broadcast_successful:
|
if not broadcast_successful:
|
||||||
rollback_transaction(
|
|
||||||
transaction_id, nodes=txn_nodes, cs_config_filename=config
|
|
||||||
)
|
|
||||||
raise CMAPIBasicError('Config distribution isn\'t successful.')
|
raise CMAPIBasicError('Config distribution isn\'t successful.')
|
||||||
|
|
||||||
try:
|
|
||||||
commit_transaction(transaction_id, cs_config_filename=config)
|
|
||||||
except Exception as err:
|
|
||||||
rollback_transaction(
|
|
||||||
transaction_id, nodes=txn_nodes, cs_config_filename=config
|
|
||||||
)
|
|
||||||
raise CMAPIBasicError(
|
|
||||||
'Error while committing transaction.'
|
|
||||||
) from err
|
|
||||||
|
|
||||||
logger.debug(f'Successfully finished removing node {node}.')
|
logger.debug(f'Successfully finished removing node {node}.')
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def set_mode(
|
def set_mode(
|
||||||
mode: str, timeout:int = 60, config: str = DEFAULT_MCS_CONF_PATH,
|
mode: str, timeout: int = 60, config: str = DEFAULT_MCS_CONF_PATH,
|
||||||
logger: logging.Logger = logging.getLogger('cmapi_server')
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Method to set MCS CLuster mode.
|
"""Method to set MCS CLuster mode.
|
||||||
|
|
||||||
@ -303,8 +267,6 @@ class ClusterHandler():
|
|||||||
:param config: columnstore xml config file path,
|
:param config: columnstore xml config file path,
|
||||||
defaults to DEFAULT_MCS_CONF_PATH
|
defaults to DEFAULT_MCS_CONF_PATH
|
||||||
:type config: str, optional
|
:type config: str, optional
|
||||||
:param logger: logger, defaults to logging.getLogger('cmapi_server')
|
|
||||||
:type logger: logging.Logger, optional
|
|
||||||
:raises CMAPIBasicError: if no master found in the cluster
|
:raises CMAPIBasicError: if no master found in the cluster
|
||||||
:raises CMAPIBasicError: on exception while starting transaction
|
:raises CMAPIBasicError: on exception while starting transaction
|
||||||
:raises CMAPIBasicError: if transaction start isn't successful
|
:raises CMAPIBasicError: if transaction start isn't successful
|
||||||
@ -315,6 +277,7 @@ class ClusterHandler():
|
|||||||
:return: result of adding node
|
:return: result of adding node
|
||||||
:rtype: dict
|
:rtype: dict
|
||||||
"""
|
"""
|
||||||
|
logger: logging.Logger = logging.getLogger('cmapi_server')
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f'Cluster mode set command called. Setting mode to {mode}.'
|
f'Cluster mode set command called. Setting mode to {mode}.'
|
||||||
)
|
)
|
||||||
@ -323,7 +286,6 @@ class ClusterHandler():
|
|||||||
cmapi_cfg_parser = get_config_parser(CMAPI_CONF_PATH)
|
cmapi_cfg_parser = get_config_parser(CMAPI_CONF_PATH)
|
||||||
api_key = get_current_key(cmapi_cfg_parser)
|
api_key = get_current_key(cmapi_cfg_parser)
|
||||||
headers = {'x-api-key': api_key}
|
headers = {'x-api-key': api_key}
|
||||||
transaction_id = get_id()
|
|
||||||
|
|
||||||
master = None
|
master = None
|
||||||
if len(get_active_nodes(config)) != 0:
|
if len(get_active_nodes(config)) != 0:
|
||||||
@ -359,7 +321,6 @@ class ClusterHandler():
|
|||||||
def set_api_key(
|
def set_api_key(
|
||||||
api_key: str, verification_key: str,
|
api_key: str, verification_key: str,
|
||||||
config: str = DEFAULT_MCS_CONF_PATH,
|
config: str = DEFAULT_MCS_CONF_PATH,
|
||||||
logger: logging.Logger = logging.getLogger('cmapi_server')
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Method to set API key for each CMAPI node in cluster.
|
"""Method to set API key for each CMAPI node in cluster.
|
||||||
|
|
||||||
@ -370,13 +331,12 @@ class ClusterHandler():
|
|||||||
:param config: columnstore xml config file path,
|
:param config: columnstore xml config file path,
|
||||||
defaults to DEFAULT_MCS_CONF_PATH
|
defaults to DEFAULT_MCS_CONF_PATH
|
||||||
:type config: str, optional
|
:type config: str, optional
|
||||||
:param logger: logger, defaults to logging.getLogger('cmapi_server')
|
|
||||||
:type logger: logging.Logger, optional
|
|
||||||
:raises CMAPIBasicError: if catch some exception while setting API key
|
:raises CMAPIBasicError: if catch some exception while setting API key
|
||||||
to each node
|
to each node
|
||||||
:return: status result
|
:return: status result
|
||||||
:rtype: dict
|
:rtype: dict
|
||||||
"""
|
"""
|
||||||
|
logger: logging.Logger = logging.getLogger('cmapi_server')
|
||||||
logger.debug('Cluster set API key command called.')
|
logger.debug('Cluster set API key command called.')
|
||||||
|
|
||||||
active_nodes = get_active_nodes(config)
|
active_nodes = get_active_nodes(config)
|
||||||
|
@ -290,7 +290,7 @@ def broadcast_new_config(
|
|||||||
sm_config_filename: str = DEFAULT_SM_CONF_PATH,
|
sm_config_filename: str = DEFAULT_SM_CONF_PATH,
|
||||||
test_mode: bool = False,
|
test_mode: bool = False,
|
||||||
nodes: Optional[list] = None,
|
nodes: Optional[list] = None,
|
||||||
timeout: int = 10
|
timeout: Optional[int] = None
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Send new config to nodes. Now in async way.
|
"""Send new config to nodes. Now in async way.
|
||||||
|
|
||||||
@ -491,7 +491,7 @@ def save_cmapi_conf_file(cfg_parser, config_filepath: str = CMAPI_CONF_PATH):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_active_nodes(config:str = DEFAULT_MCS_CONF_PATH) -> list:
|
def get_active_nodes(config: str = DEFAULT_MCS_CONF_PATH) -> list:
|
||||||
"""Get active nodes from Columnstore.xml.
|
"""Get active nodes from Columnstore.xml.
|
||||||
|
|
||||||
Actually this is only names of nodes by which node have been added.
|
Actually this is only names of nodes by which node have been added.
|
||||||
|
@ -25,16 +25,34 @@ class TransactionManager(ContextDecorator):
|
|||||||
:type txn_id: Optional[int], optional
|
:type txn_id: Optional[int], optional
|
||||||
:param handle_signals: handle specific signals or not, defaults to False
|
:param handle_signals: handle specific signals or not, defaults to False
|
||||||
:type handle_signals: bool, optional
|
:type handle_signals: bool, optional
|
||||||
|
:param extra_nodes: extra nodes to start transaction at, defaults to None
|
||||||
|
:type extra_nodes: Optional[list], optional
|
||||||
|
:param remove_nodes: nodes to remove from transaction, defaults to None
|
||||||
|
:type remove_nodes: Optional[list], optional
|
||||||
|
:param optional_nodes: nodes to add to transaction, defaults to None
|
||||||
|
:type optional_nodes: Optional[list], optional
|
||||||
|
:raises CMAPIBasicError: if there are no nodes in the cluster
|
||||||
|
:raises CMAPIBasicError: if starting transaction isn't succesful
|
||||||
|
:raises Exception: if error while starting the transaction
|
||||||
|
:raises Exception: if error while committing transaction
|
||||||
|
:raises Exception: if error while rollback transaction
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, timeout: float = TRANSACTION_TIMEOUT,
|
self, timeout: float = TRANSACTION_TIMEOUT,
|
||||||
txn_id: Optional[int] = None, handle_signals: bool = False
|
txn_id: Optional[int] = None, handle_signals: bool = False,
|
||||||
|
extra_nodes: Optional[list] = None,
|
||||||
|
remove_nodes: Optional[list] = None,
|
||||||
|
optional_nodes: Optional[list] = None,
|
||||||
):
|
):
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
self.txn_id = txn_id or get_id()
|
self.txn_id = txn_id or get_id()
|
||||||
self.handle_signals = handle_signals
|
self.handle_signals = handle_signals
|
||||||
self.active_transaction = False
|
self.active_transaction = False
|
||||||
|
self.extra_nodes = extra_nodes
|
||||||
|
self.remove_nodes = remove_nodes
|
||||||
|
self.optional_nodes = optional_nodes
|
||||||
|
self.success_txn_nodes = None
|
||||||
|
|
||||||
def _handle_exception(
|
def _handle_exception(
|
||||||
self, exc: Optional[Type[Exception]] = None,
|
self, exc: Optional[Type[Exception]] = None,
|
||||||
@ -53,7 +71,7 @@ class TransactionManager(ContextDecorator):
|
|||||||
"""
|
"""
|
||||||
# message = 'Got exception in transaction manager'
|
# message = 'Got exception in transaction manager'
|
||||||
if (exc or signum) and self.active_transaction:
|
if (exc or signum) and self.active_transaction:
|
||||||
self.rollback_transaction()
|
self.rollback_transaction(nodes=self.success_txn_nodes)
|
||||||
self.set_default_signals()
|
self.set_default_signals()
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
@ -79,10 +97,14 @@ class TransactionManager(ContextDecorator):
|
|||||||
signal(SIGTERM, SIG_DFL)
|
signal(SIGTERM, SIG_DFL)
|
||||||
signal(SIGHUP, SIG_DFL)
|
signal(SIGHUP, SIG_DFL)
|
||||||
|
|
||||||
def rollback_transaction(self) -> None:
|
def rollback_transaction(self, nodes: Optional[list] = None) -> None:
|
||||||
"""Rollback transaction."""
|
"""Rollback transaction.
|
||||||
|
|
||||||
|
:param nodes: nodes to rollback transaction, defaults to None
|
||||||
|
:type nodes: Optional[list], optional
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
rollback_transaction(self.txn_id)
|
rollback_transaction(self.txn_id, nodes=nodes)
|
||||||
self.active_transaction = False
|
self.active_transaction = False
|
||||||
logging.debug(f'Success rollback of transaction "{self.txn_id}".')
|
logging.debug(f'Success rollback of transaction "{self.txn_id}".')
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -91,15 +113,20 @@ class TransactionManager(ContextDecorator):
|
|||||||
exc_info=True
|
exc_info=True
|
||||||
)
|
)
|
||||||
|
|
||||||
def commit_transaction(self):
|
def commit_transaction(self, nodes: Optional[list] = None) -> None:
|
||||||
"""Commit transaction."""
|
"""Commit transaction.
|
||||||
|
|
||||||
|
:param nodes: nodes to commit transaction, defaults to None
|
||||||
|
:type nodes: Optional[list], optional
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
commit_transaction(
|
commit_transaction(
|
||||||
self.txn_id, cs_config_filename=DEFAULT_MCS_CONF_PATH
|
self.txn_id, cs_config_filename=DEFAULT_MCS_CONF_PATH,
|
||||||
|
nodes=nodes
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.error(f'Error while committing transaction {self.txn_id}')
|
logging.error(f'Error while committing transaction {self.txn_id}')
|
||||||
self.rollback_transaction()
|
self.rollback_transaction(nodes=self.success_txn_nodes)
|
||||||
self.set_default_signals()
|
self.set_default_signals()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@ -107,9 +134,11 @@ class TransactionManager(ContextDecorator):
|
|||||||
if self.handle_signals:
|
if self.handle_signals:
|
||||||
self.set_custom_signals()
|
self.set_custom_signals()
|
||||||
try:
|
try:
|
||||||
suceeded, _transaction_id, successes = start_transaction(
|
suceeded, _, success_txn_nodes = start_transaction(
|
||||||
cs_config_filename=DEFAULT_MCS_CONF_PATH,
|
cs_config_filename=DEFAULT_MCS_CONF_PATH,
|
||||||
txn_id=self.txn_id, timeout=self.timeout
|
extra_nodes=self.extra_nodes, remove_nodes=self.remove_nodes,
|
||||||
|
optional_nodes=self.optional_nodes,
|
||||||
|
txn_id=self.txn_id, timeout=self.timeout,
|
||||||
)
|
)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logging.error('Error while starting the transaction.')
|
logging.error('Error while starting the transaction.')
|
||||||
@ -118,19 +147,26 @@ class TransactionManager(ContextDecorator):
|
|||||||
self._handle_exception(
|
self._handle_exception(
|
||||||
exc=CMAPIBasicError('Starting transaction isn\'t succesful.')
|
exc=CMAPIBasicError('Starting transaction isn\'t succesful.')
|
||||||
)
|
)
|
||||||
if suceeded and len(successes) == 0:
|
if suceeded and len(success_txn_nodes) == 0:
|
||||||
self._handle_exception(
|
# corner case when deleting last node in the cluster
|
||||||
exc=CMAPIBasicError('There are no nodes in the cluster.')
|
# TODO: remove node mechanics potentially has a vulnerability
|
||||||
)
|
# because no transaction started for removing node.
|
||||||
|
# Probably in some cases rollback never works for removing
|
||||||
|
# node, because it never exist in success_txn_nodes.
|
||||||
|
if not self.remove_nodes:
|
||||||
|
self._handle_exception(
|
||||||
|
exc=CMAPIBasicError('There are no nodes in the cluster.')
|
||||||
|
)
|
||||||
self.active_transaction = True
|
self.active_transaction = True
|
||||||
|
self.success_txn_nodes = success_txn_nodes
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self, *exc):
|
def __exit__(self, *exc):
|
||||||
if exc[0] and self.active_transaction:
|
if exc[0] and self.active_transaction:
|
||||||
self.rollback_transaction()
|
self.rollback_transaction(nodes=self.success_txn_nodes)
|
||||||
self.set_default_signals()
|
self.set_default_signals()
|
||||||
return False
|
return False
|
||||||
if self.active_transaction:
|
if self.active_transaction:
|
||||||
self.commit_transaction()
|
self.commit_transaction(nodes=self.success_txn_nodes)
|
||||||
self.set_default_signals()
|
self.set_default_signals()
|
||||||
return True
|
return True
|
||||||
|
@ -61,7 +61,7 @@ def switch_node_maintenance(
|
|||||||
def add_node(
|
def add_node(
|
||||||
node: str, input_config_filename: str = DEFAULT_MCS_CONF_PATH,
|
node: str, input_config_filename: str = DEFAULT_MCS_CONF_PATH,
|
||||||
output_config_filename: Optional[str] = None,
|
output_config_filename: Optional[str] = None,
|
||||||
rebalance_dbroots: bool = True
|
use_rebalance_dbroots: bool = True
|
||||||
):
|
):
|
||||||
"""Add node to a cluster.
|
"""Add node to a cluster.
|
||||||
|
|
||||||
@ -86,8 +86,8 @@ def add_node(
|
|||||||
:type input_config_filename: str, optional
|
:type input_config_filename: str, optional
|
||||||
:param output_config_filename: mcs output config path, defaults to None
|
:param output_config_filename: mcs output config path, defaults to None
|
||||||
:type output_config_filename: Optional[str], optional
|
:type output_config_filename: Optional[str], optional
|
||||||
:param rebalance_dbroots: rebalance dbroots or not, defaults to True
|
:param use_rebalance_dbroots: rebalance dbroots or not, defaults to True
|
||||||
:type rebalance_dbroots: bool, optional
|
:type use_rebalance_dbroots: bool, optional
|
||||||
"""
|
"""
|
||||||
node_config = NodeConfig()
|
node_config = NodeConfig()
|
||||||
c_root = node_config.get_current_config_root(input_config_filename)
|
c_root = node_config.get_current_config_root(input_config_filename)
|
||||||
@ -100,7 +100,7 @@ def add_node(
|
|||||||
_add_Module_entries(c_root, node)
|
_add_Module_entries(c_root, node)
|
||||||
_add_active_node(c_root, node)
|
_add_active_node(c_root, node)
|
||||||
_add_node_to_ExeMgrs(c_root, node)
|
_add_node_to_ExeMgrs(c_root, node)
|
||||||
if rebalance_dbroots:
|
if use_rebalance_dbroots:
|
||||||
_rebalance_dbroots(c_root)
|
_rebalance_dbroots(c_root)
|
||||||
_move_primary_node(c_root)
|
_move_primary_node(c_root)
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -116,25 +116,41 @@ def add_node(
|
|||||||
node_config.write_config(c_root, filename=output_config_filename)
|
node_config.write_config(c_root, filename=output_config_filename)
|
||||||
|
|
||||||
|
|
||||||
# deactivate_only is a bool that indicates whether the node is being removed completely from
|
|
||||||
# the cluster, or whether it has gone offline and should still be monitored in case it comes back.
|
|
||||||
# Note! this does not pick a new primary node, use the move_primary_node() fcn to change that.
|
|
||||||
def remove_node(
|
def remove_node(
|
||||||
node, input_config_filename=DEFAULT_MCS_CONF_PATH,
|
node: str, input_config_filename: str = DEFAULT_MCS_CONF_PATH,
|
||||||
output_config_filename=None, deactivate_only=False,
|
output_config_filename: Optional[str] = None,
|
||||||
rebalance_dbroots = True, **kwargs
|
deactivate_only: bool = True,
|
||||||
|
use_rebalance_dbroots: bool = True, **kwargs
|
||||||
):
|
):
|
||||||
|
"""Remove node from a cluster.
|
||||||
|
|
||||||
|
- Rebuild the PMS section w/o node
|
||||||
|
- Remove the DBRM_Worker entry
|
||||||
|
- Remove the WES entry
|
||||||
|
- Rebuild the "Module*" entries w/o node
|
||||||
|
- Update the list of active / inactive / desired nodes
|
||||||
|
|
||||||
|
:param node: node address or hostname
|
||||||
|
:type node: str
|
||||||
|
:param input_config_filename: mcs input config path,
|
||||||
|
defaults to DEFAULT_MCS_CONF_PATH
|
||||||
|
:type input_config_filename: str, optional
|
||||||
|
:param output_config_filename: mcs output config path, defaults to None
|
||||||
|
:type output_config_filename: Optional[str], optional
|
||||||
|
:param deactivate_only: indicates whether the node is being removed
|
||||||
|
completely from the cluster, or whether it has gone
|
||||||
|
offline and should still be monitored in case it
|
||||||
|
comes back.
|
||||||
|
Note! this does not pick a new primary node,
|
||||||
|
use the move_primary_node() fcn to change that.,
|
||||||
|
defaults to True
|
||||||
|
:type deactivate_only: bool, optional
|
||||||
|
:param use_rebalance_dbroots: rebalance dbroots or not, defaults to True
|
||||||
|
:type use_rebalance_dbroots: bool, optional
|
||||||
|
"""
|
||||||
node_config = NodeConfig()
|
node_config = NodeConfig()
|
||||||
c_root = node_config.get_current_config_root(input_config_filename)
|
c_root = node_config.get_current_config_root(input_config_filename)
|
||||||
|
|
||||||
'''
|
|
||||||
Rebuild the PMS section w/o node
|
|
||||||
Remove the DBRM_Worker entry
|
|
||||||
Remove the WES entry
|
|
||||||
Rebuild the "Module*" entries w/o node
|
|
||||||
Update the list of active / inactive / desired nodes
|
|
||||||
'''
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
active_nodes = helpers.get_active_nodes(input_config_filename)
|
active_nodes = helpers.get_active_nodes(input_config_filename)
|
||||||
|
|
||||||
@ -151,7 +167,7 @@ def remove_node(
|
|||||||
# TODO: unspecific name, need to think of a better one
|
# TODO: unspecific name, need to think of a better one
|
||||||
_remove_node(c_root, node)
|
_remove_node(c_root, node)
|
||||||
|
|
||||||
if rebalance_dbroots:
|
if use_rebalance_dbroots:
|
||||||
_rebalance_dbroots(c_root)
|
_rebalance_dbroots(c_root)
|
||||||
_move_primary_node(c_root)
|
_move_primary_node(c_root)
|
||||||
else:
|
else:
|
||||||
|
@ -7,11 +7,11 @@ from datetime import datetime
|
|||||||
from cmapi_server.controllers.dispatcher import _version
|
from cmapi_server.controllers.dispatcher import _version
|
||||||
|
|
||||||
config_filename = './cmapi_server/cmapi_server.conf'
|
config_filename = './cmapi_server/cmapi_server.conf'
|
||||||
|
|
||||||
url = f"https://localhost:8640/cmapi/{_version}/node/config"
|
url = f"https://localhost:8640/cmapi/{_version}/node/config"
|
||||||
begin_url = f"https://localhost:8640/cmapi/{_version}/node/begin"
|
begin_url = f"https://localhost:8640/cmapi/{_version}/node/begin"
|
||||||
config_path = './cmapi_server/test/Columnstore_apply_config.xml'
|
config_path = './cmapi_server/test/Columnstore_apply_config.xml'
|
||||||
|
|
||||||
# create tmp dir
|
# create tmp dir
|
||||||
tmp_prefix = '/tmp/mcs_config_test'
|
tmp_prefix = '/tmp/mcs_config_test'
|
||||||
tmp_path = Path(tmp_prefix)
|
tmp_path = Path(tmp_prefix)
|
||||||
@ -43,8 +43,3 @@ body = {
|
|||||||
'timeout': 0,
|
'timeout': 0,
|
||||||
'config': config,
|
'config': config,
|
||||||
}
|
}
|
||||||
|
|
||||||
#print(config)
|
|
||||||
|
|
||||||
#r = requests.put(url, verify=False, headers=headers, json=body)
|
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ from cmapi_server.helpers import (
|
|||||||
from cmapi_server.managers.transaction import TransactionManager
|
from cmapi_server.managers.transaction import TransactionManager
|
||||||
from mcs_cluster_tool.decorators import handle_output
|
from mcs_cluster_tool.decorators import handle_output
|
||||||
from mcs_node_control.models.node_config import NodeConfig
|
from mcs_node_control.models.node_config import NodeConfig
|
||||||
from cmapi.cmapi_server.controllers.api_clients import ClusterControllerClient
|
from cmapi_server.controllers.api_clients import ClusterControllerClient
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger('mcs_cli')
|
logger = logging.getLogger('mcs_cli')
|
||||||
@ -191,9 +191,6 @@ def restart():
|
|||||||
|
|
||||||
@node_app.command(rich_help_panel='cluster node commands')
|
@node_app.command(rich_help_panel='cluster node commands')
|
||||||
@handle_output
|
@handle_output
|
||||||
@TransactionManager(
|
|
||||||
timeout=timedelta(days=1).total_seconds(), handle_signals=True
|
|
||||||
)
|
|
||||||
def add(
|
def add(
|
||||||
nodes: Optional[List[str]] = typer.Option(
|
nodes: Optional[List[str]] = typer.Option(
|
||||||
...,
|
...,
|
||||||
@ -206,8 +203,12 @@ def add(
|
|||||||
):
|
):
|
||||||
"""Add nodes to the Columnstore cluster."""
|
"""Add nodes to the Columnstore cluster."""
|
||||||
result = []
|
result = []
|
||||||
for node in nodes:
|
with TransactionManager(
|
||||||
result.append(client.add_node({'node': node}))
|
timeout=timedelta(days=1).total_seconds(), handle_signals=True,
|
||||||
|
extra_nodes=nodes
|
||||||
|
):
|
||||||
|
for node in nodes:
|
||||||
|
result.append(client.add_node({'node': node}))
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
@ -224,8 +225,12 @@ def remove(nodes: Optional[List[str]] = typer.Option(
|
|||||||
):
|
):
|
||||||
"""Remove nodes from the Columnstore cluster."""
|
"""Remove nodes from the Columnstore cluster."""
|
||||||
result = []
|
result = []
|
||||||
for node in nodes:
|
with TransactionManager(
|
||||||
result.append(client.remove_node(node))
|
timeout=timedelta(days=1).total_seconds(), handle_signals=True,
|
||||||
|
remove_nodes=nodes
|
||||||
|
):
|
||||||
|
for node in nodes:
|
||||||
|
result.append(client.remove_node(node))
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
@ -265,6 +270,7 @@ def api_key(key: str = typer.Option(..., help='API key to set.')):
|
|||||||
return client.set_api_key(key)
|
return client.set_api_key(key)
|
||||||
|
|
||||||
|
|
||||||
|
#TODO: remove in next releases
|
||||||
@set_app.command()
|
@set_app.command()
|
||||||
@handle_output
|
@handle_output
|
||||||
def log_level(level: str = typer.Option(..., help='Logging level to set.')):
|
def log_level(level: str = typer.Option(..., help='Logging level to set.')):
|
||||||
|
Reference in New Issue
Block a user