1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-11-02 06:13:16 +03:00
Files
mariadb-columnstore-engine/cmapi/cmapi_server/handlers/cluster.py
mariadb-AlanMologorsky c86586c228 feat(cmapi,failover): MCOL-6006 Disable failover when shared storage not detected
- Add SharedStorageMonitor thread to periodically verify shared storage:
  * Writes a temp file to the shared location and validates MD5 from all nodes.
  * Skips nodes with unstable recent heartbeats; retries once; defers decision if any node is unreachable.
  * Updates a cluster-wide stateful flag (shared_storage_on) only on conclusive checks.
- New CMAPI endpoints:
  * PUT /cmapi/{ver}/cluster/check-shared-storage — orchestrates cross-node checks.
  * GET /cmapi/{ver}/node/check-shared-file — validates a given file’s MD5 on a node.
  * PUT /cmapi/{ver}/node/stateful-config — fast path to distribute stateful config updates.
- Introduce in-memory stateful config (AppStatefulConfig) with versioned flags (term/seq) and shared_storage_on flag:
  * Broadcast via helpers.broadcast_stateful_config and enhanced broadcast_new_config.
  * Config PUT is now validated with Pydantic models; supports stateful-only updates and set_mode requests.
- Failover behavior:
  * NodeMonitor keeps failover inactive when shared_storage_on is false or cluster size < 3.
  * Rebalancing DBRoots becomes a no-op when shared storage is OFF (safety guard).
- mcl status improvements: per-node 'state' (online/offline), better timeouts and error reporting.
- Routing/wiring: add dispatcher routes for new endpoints; add ClusterModeEnum.
- Tests: cover shared-storage monitor (unreachable nodes, HB-based skipping), node manipulation with shared storage ON/OFF, and server/config flows.
- Dependencies: add pydantic; minor cleanups and logging.
2025-10-01 21:10:34 +04:00

555 lines
22 KiB
Python

"""Module contains Cluster business logic functions."""
import configparser
import hashlib
import logging
import os
import tempfile
import time
from datetime import datetime
from enum import Enum
from typing import Optional
import requests
from mcs_node_control.models.misc import get_dbrm_master
from mcs_node_control.models.node_config import NodeConfig
from tracing.traced_session import get_traced_session
from cmapi_server.constants import (
CMAPI_CONF_PATH, CMAPI_PORT, DEFAULT_MCS_CONF_PATH, REQUEST_TIMEOUT,
)
from cmapi_server.exceptions import CMAPIBasicError, exc_to_cmapi_error
from cmapi_server.controllers.api_clients import NodeControllerClient
from cmapi_server.helpers import (
broadcast_new_config,
get_active_nodes,
get_config_parser,
get_current_key,
get_dbroots,
get_version,
update_revision_and_manager,
)
from cmapi_server.node_manipulation import (
add_dbroot,
add_node,
remove_node,
switch_node_maintenance,
update_dbroots_of_read_replicas,
)
class ClusterAction(Enum):
START = 'start'
STOP = 'stop'
def toggle_cluster_state(
action: ClusterAction, config: str) -> dict:
"""Toggle the state of the cluster (start or stop).
:param action: The cluster action to perform.
(ClusterAction.START or ClusterAction.STOP).
:type action: ClusterAction
:param config: The path to the MariaDB Columnstore configuration file.
:type config: str
"""
if action == ClusterAction.START:
maintainance_flag = False
elif action == ClusterAction.STOP:
maintainance_flag = True
else:
raise ValueError(
'Invalid action. Use ClusterAction.START or ClusterAction.STOP.'
)
switch_node_maintenance(maintainance_flag)
update_revision_and_manager()
broadcast_new_config(config, distribute_secrets=True)
class ClusterHandler:
"""Class for handling MCS Cluster operations."""
@staticmethod
def status(config: str = DEFAULT_MCS_CONF_PATH) -> dict:
"""Method to get MCS Cluster status information
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:raises CMAPIBasicError: if catch some exception while getting status
from each node separately
:return: status result
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug('Cluster status command called. Getting status.')
response = {'timestamp': str(datetime.now())}
active_nodes = get_active_nodes(config)
cmapi_cfg_parser = get_config_parser(CMAPI_CONF_PATH)
api_key = get_current_key(cmapi_cfg_parser)
headers = {'x-api-key': api_key}
num_nodes = 0
for node in active_nodes:
url = f'https://{node}:8640/cmapi/{get_version()}/node/status'
try:
r = get_traced_session().request(
'GET', url, verify=False, headers=headers, timeout=REQUEST_TIMEOUT
)
r.raise_for_status()
r_json = r.json()
if len(r_json.get('services', 0)) == 0:
r_json['dbrm_mode'] = 'offline'
r_json['cluster_mode'] = 'offline'
# add node state field ('online' if services not empty and mode not offline)
services = r_json.get('services', [])
node_state = (
'offline'
if not services or r_json.get('cluster_mode') == 'offline'
else 'online'
)
r_json['state'] = node_state
response[f'{str(node)}'] = r_json
num_nodes += 1
except (requests.exceptions.RequestException, ValueError) as err:
# Do not fail the whole request: record node as unreachable
logger.error('Error retrieving status from node %s: %s', node, str(err))
try:
node_dbroots = sorted(get_dbroots(node, config))
except Exception as e:
logger.warning(
'ClusterHandler.status: failed to obtain dbroots for node %s: %s. Using empty list.',
node, e
)
node_dbroots = []
response[str(node)] = {
'timestamp': str(datetime.now()),
'uptime': None,
'dbrm_mode': 'offline',
'cluster_mode': 'offline',
'dbroots': node_dbroots,
'module_id': 0,
'services': [],
'state': 'offline',
'error': f'Unreachable: {err.__class__.__name__}'
}
# num_nodes stays as number of reachable nodes
response['num_nodes'] = num_nodes
logger.debug('Successfully finished getting cluster status.')
return response
@staticmethod
def start(config: str = DEFAULT_MCS_CONF_PATH) -> dict:
"""Method to start MCS Cluster.
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:raises CMAPIBasicError: if no nodes in the cluster
:return: start timestamp
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.info('Cluster start command called. Starting the cluster.')
operation_start_time = str(datetime.now())
toggle_cluster_state(ClusterAction.START, config)
logger.info('Successfully finished cluster start.')
return {'timestamp': operation_start_time}
@staticmethod
def shutdown(
config: str = DEFAULT_MCS_CONF_PATH, timeout: Optional[int] = None
) -> dict:
"""Method to stop the MCS Cluster.
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param timeout: timeout in seconds to gracefully stop DMLProc,
defaults to None
:type timeout: Optional[int], optional
:raises CMAPIBasicError: if no nodes in the cluster
:return: start timestamp
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug(
'Cluster shutdown command called. Shutting down the cluster.'
)
operation_start_time = str(datetime.now())
toggle_cluster_state(ClusterAction.STOP, config)
logger.debug('Successfully finished shutting down the cluster.')
return {'timestamp': operation_start_time}
@staticmethod
def add_node(
node: str, config: str = DEFAULT_MCS_CONF_PATH,
read_replica: bool = False,
) -> dict:
"""Method to add node to MCS CLuster.
:param node: node IP or name or FQDN
:type node: str
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param read_replica: add node as read replica, defaults to False
:type read_replica: bool, optional
:raises CMAPIBasicError: on exception while starting transaction
:raises CMAPIBasicError: if transaction start isn't successful
:raises CMAPIBasicError: on exception while adding node
:raises CMAPIBasicError: on exception while distributing new config
:raises CMAPIBasicError: on unsuccessful distibuting config file
:raises CMAPIBasicError: on exception while committing transaction
:return: result of adding node
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug(
f'Cluster add node command called. Adding node {node} in '
f'{"read-replica" if read_replica else "read-write"} mode.'
)
response = {'timestamp': str(datetime.now())}
with exc_to_cmapi_error(prefix='Error while adding node'):
add_node(
node, input_config_filename=config,
output_config_filename=config,
read_replica=read_replica,
)
if not get_dbroots(node, config):
if not read_replica: # Read replicas don't own dbroots
add_dbroot(
host=node, input_config_filename=config,
output_config_filename=config
)
response['node_id'] = node
update_revision_and_manager(
input_config_filename=config, output_config_filename=config
)
broadcast_new_config(config, distribute_secrets=True)
ClusterHandler.check_shared_storage()
logger.debug(f'Successfully finished adding node {node}.')
return response
@staticmethod
def remove_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict:
"""Method to remove node from MCS CLuster.
:param node: node IP or name or FQDN
:type node: str
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:raises CMAPIBasicError: on exception while starting transaction
:raises CMAPIBasicError: if transaction start isn't successful
:raises CMAPIBasicError: on exception while removing node
:raises CMAPIBasicError: on exception while distributing new config
:raises CMAPIBasicError: on unsuccessful distibuting config file
:raises CMAPIBasicError: on exception while committing transaction
:return: result of node removing
:rtype: dict
"""
#TODO: This method will be moved to transaction manager in next release
# Due to specific use of txn_nodes inside.
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug(
f'Cluster remove node command called. Removing node {node}.'
)
response = {'timestamp': str(datetime.now())}
with exc_to_cmapi_error(prefix='Error while removing node'):
remove_node(
node, input_config_filename=config,
output_config_filename=config
)
response['node_id'] = node
active_nodes = get_active_nodes(config)
if len(active_nodes) > 0:
with NodeConfig().modify_config(config) as root:
update_dbroots_of_read_replicas(root)
update_revision_and_manager(
input_config_filename=config, output_config_filename=config
)
broadcast_new_config(config, nodes=active_nodes)
ClusterHandler.check_shared_storage()
logger.debug(f'Successfully finished removing node {node}.')
return response
@staticmethod
def set_mode(
mode: str, timeout: int = 60, config: str = DEFAULT_MCS_CONF_PATH,
) -> dict:
"""Method to set MCS CLuster mode.
:param mode: cluster mode to set, can be only "readonly" or "readwrite"
:type mode: str
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:raises CMAPIBasicError: if no master found in the cluster
:raises CMAPIBasicError: on exception while starting transaction
:raises CMAPIBasicError: if transaction start isn't successful
:raises CMAPIBasicError: on exception while adding node
:raises CMAPIBasicError: on exception while distributing new config
:raises CMAPIBasicError: on unsuccessful distibuting config file
:raises CMAPIBasicError: on exception while committing transaction
:return: result of adding node
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug(
f'Cluster mode set command called. Setting mode to {mode}.'
)
response = {'timestamp': str(datetime.now())}
cmapi_cfg_parser = get_config_parser(CMAPI_CONF_PATH)
api_key = get_current_key(cmapi_cfg_parser)
headers = {'x-api-key': api_key}
master = None
if len(get_active_nodes(config)) != 0:
master = get_dbrm_master(config)
if master is None:
raise CMAPIBasicError('No master found in the cluster.')
else:
master = master['IPAddr']
payload: dict = {}
payload['cluster_mode'] = mode
url = f'https://{master}:8640/cmapi/{get_version()}/node/config'
nc = NodeConfig()
root = nc.get_current_config_root(config_filename=config)
payload['manager'] = root.find('./ClusterManager').text
payload['revision'] = root.find('./ConfigRevision').text
payload['timeout'] = timeout
payload['cluster_mode'] = mode
try:
r = get_traced_session().request(
'PUT', url, headers=headers, json=payload, verify=False
)
r.raise_for_status()
response['cluster-mode'] = mode
except Exception as err:
raise CMAPIBasicError(
f'Error while setting cluster mode to {mode}'
) from err
logger.debug(f'Successfully set cluster mode to {mode}.')
return response
@staticmethod
def set_api_key(
api_key: str, verification_key: str,
config: str = DEFAULT_MCS_CONF_PATH,
) -> dict:
"""Method to set API key for each CMAPI node in cluster.
:param api_key: API key to set
:type api_key: str
:param verification_key: TOTP key to verify
:type verification_key: str
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:raises CMAPIBasicError: if catch some exception while setting API key
to each node
:return: status result
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug('Cluster set API key command called.')
active_nodes = get_active_nodes(config)
body = {
'api_key': api_key,
'verification_key': verification_key
}
response = {}
# only for changing response object below
active_nodes_count = len(active_nodes)
if not active_nodes:
# set api key in configuration file on this node
logger.debug(
'No active nodes found, set API key into current CMAPI conf.'
)
active_nodes.append('localhost')
for node in active_nodes:
logger.debug(f'Setting new api key to "{node}".')
url = f'https://{node}:8640/cmapi/{get_version()}/node/apikey-set'
try:
resp = get_traced_session().request('PUT', url, verify=False, json=body, headers={})
resp.raise_for_status()
r_json = resp.json()
if active_nodes_count > 0:
response[str(node)] = r_json
except Exception as err:
raise CMAPIBasicError(
f'Got an error setting API key to "{node}".'
) from err
logger.debug(f'Successfully set new api key to "{node}".')
response['timestamp'] = str(datetime.now())
logger.debug(
'Successfully finished setting new API key to all nodes.'
)
return response
@staticmethod
def set_log_level(
level: str, config: str = DEFAULT_MCS_CONF_PATH,
logger: logging.Logger = logging.getLogger('cmapi_server')
) -> dict:
"""Method to set level for loggers on each CMAPI node in cluster.
:param level: logging level, including custom
:type level: str
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param logger: logger, defaults to logging.getLogger('cmapi_server')
:type logger: logging.Logger, optional
:return: status result
:rtype: dict
"""
logger.debug('Cluster set new logging level called.')
active_nodes = get_active_nodes(config)
body = {'level': level}
response = {}
# only for changing response object below
active_nodes_count = len(active_nodes)
if not active_nodes:
# set api key in configuration file on this node
logger.debug(
'No active nodes found, set log level onluy for current node.'
)
active_nodes.append('localhost')
for node in active_nodes:
logger.debug(f'Setting new log level to "{node}".')
url = f'https://{node}:8640/cmapi/{get_version()}/node/log-level'
try:
resp = get_traced_session().request('PUT', url, verify=False, json=body, headers={})
resp.raise_for_status()
r_json = resp.json()
if active_nodes_count > 0:
response[str(node)] = r_json
except Exception as err:
raise CMAPIBasicError(
f'Got an error setting log level to "{node}".'
) from err
logger.debug(f'Successfully set new log level to "{node}".')
response['timestamp'] = str(datetime.now())
logger.debug(
'Successfully finished setting new log level to all nodes.'
)
return response
@staticmethod
def check_shared_storage(skip_nodes: Optional[list[str]] = None) -> dict:
"""Check shared storage.
:return: status result
"""
tmp_file_path: str
active_nodes = get_active_nodes()
if skip_nodes:
# Remove any nodes the caller asked us to skip (e.g., unstable HB)
active_nodes = [n for n in active_nodes if n not in set(skip_nodes)]
all_responses: dict = dict()
nodes_errors: dict = dict()
sm_parser = configparser.ConfigParser()
sm_config_str = NodeConfig().get_current_sm_config()
sm_parser.read_string(sm_config_str)
storage_type = sm_parser.get(
'ObjectStorage', 'service', fallback='LocalStorage'
)
file_dir = '/var/lib/columnstore/data1'
if storage_type.lower() == 's3':
file_dir = '/var/lib/columnstore/storagemanager/metadata/data1'
with tempfile.NamedTemporaryFile(
mode='wb+', delete=True, dir=file_dir, prefix='mcs_test_shared'
) as temp_file:
file_data = rb'File to check shared storage working.'
temp_file.write(file_data)
# Make sure data is on disk/visible to other nodes before checks
temp_file.flush()
os.fsync(temp_file.fileno())
tmp_file_path = temp_file.name
logging.debug(f'Temporary file to check shared storage created at: {tmp_file_path}')
tmp_file_md5 = hashlib.md5(file_data).hexdigest()
logging.debug(f'Temporary file md5: {tmp_file_md5}')
for node in active_nodes:
logging.debug(f'Checking shared file on {node!r}.')
client = NodeControllerClient(
request_timeout=REQUEST_TIMEOUT,
base_url=f'https://{node}:{CMAPI_PORT}'
)
last_err_msg = None
for attempt in range(2):
try:
node_response = client.check_shared_file(
file_path=tmp_file_path, check_sum=tmp_file_md5
)
logging.debug(f'Finished checking file on {node!r}')
all_responses[node] = node_response
break
except CMAPIBasicError as err: # per-node failure must not abort the whole check
last_err_msg = err.message
if attempt == 0:
time.sleep(1)
continue
else:
# Retries exhausted
logging.warning(
f'Error checking shared file on {node!r}: {last_err_msg}',
exc_info=True
)
nodes_errors[node] = last_err_msg or 'unknown error'
nodes_success_responses = [
v.get('success', False) for v in all_responses.values()
]
if nodes_success_responses:
shared_storage = all(nodes_success_responses)
else:
# no nodes in cluster case
shared_storage = False
# Consider partial failures either when not all successful among reachable
# or when some nodes were unreachable (nodes_errors present).
partially_failed = False
if len(active_nodes) > 2:
if nodes_errors:
partially_failed = True
elif nodes_success_responses and not all(nodes_success_responses):
partially_failed = True
response = {
'timestamp': str(datetime.now()),
'shared_storage': shared_storage,
'partially_failed': partially_failed,
'active_nodes_count': len(active_nodes),
'nodes_responses': {**all_responses},
'nodes_errors': {**nodes_errors}
}
logging.debug(
'Successfully finished checking shared storage on all nodes.'
)
return response