1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-10-31 18:30:33 +03:00
Files
mariadb-columnstore-engine/cmapi/failover/shared_storage_monitor.py
mariadb-AlanMologorsky c86586c228 feat(cmapi,failover): MCOL-6006 Disable failover when shared storage not detected
- Add SharedStorageMonitor thread to periodically verify shared storage:
  * Writes a temp file to the shared location and validates MD5 from all nodes.
  * Skips nodes with unstable recent heartbeats; retries once; defers decision if any node is unreachable.
  * Updates a cluster-wide stateful flag (shared_storage_on) only on conclusive checks.
- New CMAPI endpoints:
  * PUT /cmapi/{ver}/cluster/check-shared-storage — orchestrates cross-node checks.
  * GET /cmapi/{ver}/node/check-shared-file — validates a given file’s MD5 on a node.
  * PUT /cmapi/{ver}/node/stateful-config — fast path to distribute stateful config updates.
- Introduce in-memory stateful config (AppStatefulConfig) with versioned flags (term/seq) and shared_storage_on flag:
  * Broadcast via helpers.broadcast_stateful_config and enhanced broadcast_new_config.
  * Config PUT is now validated with Pydantic models; supports stateful-only updates and set_mode requests.
- Failover behavior:
  * NodeMonitor keeps failover inactive when shared_storage_on is false or cluster size < 3.
  * Rebalancing DBRoots becomes a no-op when shared storage is OFF (safety guard).
- mcl status improvements: per-node 'state' (online/offline), better timeouts and error reporting.
- Routing/wiring: add dispatcher routes for new endpoints; add ClusterModeEnum.
- Tests: cover shared-storage monitor (unreachable nodes, HB-based skipping), node manipulation with shared storage ON/OFF, and server/config flows.
- Dependencies: add pydantic; minor cleanups and logging.
2025-10-01 21:10:34 +04:00

203 lines
8.8 KiB
Python

import logging
import time
import threading
from pathlib import Path
from typing import Optional, List
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.constants import REQUEST_TIMEOUT
from cmapi_server.controllers.api_clients import ClusterControllerClient
from cmapi_server.helpers import broadcast_stateful_config, get_active_nodes
from cmapi_server.managers.application import (
AppStatefulConfig, StatefulConfigModel, StatefulFlagsModel,
)
from cmapi_server.node_manipulation import get_dbroots_paths
from mcs_node_control.models.node_config import NodeConfig
from .heartbeat_history import HBHistory
class SharedStorageMonitor:
def __init__(self, check_interval: int = 5, hb_history: Optional[HBHistory] = None):
self._die = False
self._logger = logging.getLogger('shared_storage_monitor')
self._runner = None
self._node_config = NodeConfig()
self._cluster_api_client = ClusterControllerClient(request_timeout=REQUEST_TIMEOUT)
self.check_interval = check_interval
self.last_check_time = 0
# Track nodes that were unreachable during the last check to avoid
# flipping the shared storage flag based on partial visibility.
self._last_unreachable_nodes: set[str] = set()
self._hb_history = hb_history
def __del__(self):
try:
if getattr(self, '_runner', None):
self.stop()
except Exception: # pylint: disable=broad-except
pass
def start(self):
self._die = False
if self._runner and self._runner.is_alive():
self._logger.debug('Shared storage monitor already running.')
return
self._runner = threading.Thread(target=self.monitor, name='SharedStorageMonitor', daemon=True)
self._runner.start()
def stop(self):
self._die = True
if self._runner and threading.current_thread() is not self._runner:
self._runner.join(timeout=self.check_interval + 2)
if self._runner.is_alive():
self._logger.warning('Shared storage monitor thread did not exit promptly.')
def monitor(self):
self._logger.info('Starting shared storage monitor.')
while not self._die:
try:
self._logger.info('Shared storage monitor running check.')
self._monitor()
except Exception: # pylint: disable=broad-except
self._logger.error('Shared storage monitor caught an exception.', exc_info=True)
if not self._die:
self._logger.info(
'Shared storage monitor finished check, sleeping '
f'{self.check_interval} seconds.'
)
time.sleep(self.check_interval)
self._logger.info('Shared storage monitor exited normally')
def _retrieve_unstable_nodes(self) -> List[str]:
"""Skip nodes whose latest stable heartbeat sample is NoResponse.
We only consider the most recent finalized sample (currentTick - lateWindow),
avoiding partial/in-flight samples. If that value is NoResponse, we
temporarily exclude the node from the shared-storage check for this cycle.
"""
if not self._hb_history:
return []
try:
active_nodes = get_active_nodes()
except Exception: # pylint: disable=broad-except
# If we cannot load active nodes, be safe and skip none.
return []
unstable_nodes: list[str] = []
# We only need one stable sample; ask for 1 recent finalized value.
lookback = 1
for node in active_nodes:
# Use GoodResponse as default to avoid over-skipping brand new nodes.
hist = self._hb_history.getNodeHistory(node, lookback, HBHistory.GoodResponse)
if not hist:
continue
if hist[-1] == HBHistory.NoResponse:
unstable_nodes.append(node)
return unstable_nodes
def _check_shared_storage(self) -> Optional[bool]:
extra_payload = {}
# Compute skip list based on recent heartbeat instability (if available)
skip_nodes = self._retrieve_unstable_nodes()
if skip_nodes:
self._logger.debug(
f'Shared storage check will skip unstable nodes (HB drop): {sorted(skip_nodes)}'
)
extra_payload['skip_nodes'] = skip_nodes
try:
response = self._cluster_api_client.check_shared_storage(extra=extra_payload)
except CMAPIBasicError as err:
self._logger.error(f'Error while calling cluster shared storage check: {err.message}')
return None
except Exception: # pylint: disable=broad-except
self._logger.error(
'Unexpected error while calling cluster shared storage check.',
exc_info=True
)
return None
shared_storage_on = response.get('shared_storage', None)
if shared_storage_on is None:
self._logger.error(
'Shared storage check response does not contain "shared_storage" key.'
)
shared_storage_on = False
active_nodes_count = int(response.get('active_nodes_count', 0))
if active_nodes_count < 2:
# we can't reliably detect shared storage state with less than 2 nodes
# in cluster, so we do not change the flag in this case
logging.debug(
'Less than 2 nodes in cluster, no need to change flag of shared storage.'
)
return None
# If some nodes were unreachable during the check, treat the result as
# inconclusive and do not update the stateful flag. This avoids flipping
# shared_storage_off when a node is simply down/unreachable.
nodes_errors = response.get('nodes_errors') or {}
if nodes_errors:
self._last_unreachable_nodes = set(nodes_errors.keys())
self._logger.warning(
'Shared storage check has unreachable nodes; deferring decision. '
f'Nodes: {sorted(self._last_unreachable_nodes)}'
)
return None
# No unreachable nodes; clear any previously tracked ones.
if self._last_unreachable_nodes:
self._logger.info(
'Previously unreachable nodes are now reachable; clearing state.'
)
self._last_unreachable_nodes.clear()
return shared_storage_on
def _check_listed_dbroots_exist(self):
c_root = self._node_config.get_current_config_root()
dbroots = get_dbroots_paths(c_root)
if not dbroots:
self._logger.error('No DBRoots found, cannot check shared storage.')
return False
for dbroot in dbroots:
if not Path(dbroot).exists():
self._logger.error(f'DBRoot {dbroot} listed in xml config does not exist.')
return False
return True
def _monitor(self):
dbroots_available: bool = False
shared_storage_on_result = None
if not self._node_config.is_primary_node():
self._logger.debug('This node is not primary, skipping shared storage check.')
return
dbroots_available = self._check_listed_dbroots_exist()
if not dbroots_available:
self._logger.info('DBRoots are not available, no need to api check shared storage.')
shared_storage_on_result = False
else:
shared_storage_on_result = self._check_shared_storage()
current_stateful_config: StatefulConfigModel = AppStatefulConfig.get_config_copy()
current_shared_storage_on: bool = current_stateful_config.flags.shared_storage_on
# If result is None, the check was inconclusive (e.g., some nodes unreachable);
# keep the current flag unchanged and exit.
if shared_storage_on_result is None:
self._logger.debug(
f'Shared storage check inconclusive; Keeping current state: {current_shared_storage_on}'
)
return
if not current_shared_storage_on != bool(shared_storage_on_result):
self._logger.debug(f'Shared storage state is unchanged: {current_shared_storage_on}')
else:
self._logger.info(
f'Shared storage state changed from {current_shared_storage_on} '
f'to {bool(shared_storage_on_result)}. Updating stateful config.'
)
new_stateful_config = StatefulConfigModel(
version=current_stateful_config.version.next_seq(),
flags=StatefulFlagsModel(shared_storage_on=bool(shared_storage_on_result))
)
new_stateful_config_dict = new_stateful_config.model_dump(mode='json')
broadcast_stateful_config(stateful_config_dict=new_stateful_config_dict)