You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	- Add SharedStorageMonitor thread to periodically verify shared storage:
  * Writes a temp file to the shared location and validates MD5 from all nodes.
  * Skips nodes with unstable recent heartbeats; retries once; defers decision if any node is unreachable.
  * Updates a cluster-wide stateful flag (shared_storage_on) only on conclusive checks.
- New CMAPI endpoints:
  * PUT /cmapi/{ver}/cluster/check-shared-storage — orchestrates cross-node checks.
  * GET /cmapi/{ver}/node/check-shared-file — validates a given file’s MD5 on a node.
  * PUT /cmapi/{ver}/node/stateful-config — fast path to distribute stateful config updates.
- Introduce in-memory stateful config (AppStatefulConfig) with versioned flags (term/seq) and shared_storage_on flag:
  * Broadcast via helpers.broadcast_stateful_config and enhanced broadcast_new_config.
  * Config PUT is now validated with Pydantic models; supports stateful-only updates and set_mode requests.
- Failover behavior:
  * NodeMonitor keeps failover inactive when shared_storage_on is false or cluster size < 3.
  * Rebalancing DBRoots becomes a no-op when shared storage is OFF (safety guard).
- mcl status improvements: per-node 'state' (online/offline), better timeouts and error reporting.
- Routing/wiring: add dispatcher routes for new endpoints; add ClusterModeEnum.
- Tests: cover shared-storage monitor (unreachable nodes, HB-based skipping), node manipulation with shared storage ON/OFF, and server/config flows.
- Dependencies: add pydantic; minor cleanups and logging.
		
	
		
			
				
	
	
		
			319 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			319 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import logging
 | 
						|
import platform
 | 
						|
import threading
 | 
						|
from functools import total_ordering
 | 
						|
from typing import Any, Dict, Optional, Tuple
 | 
						|
 | 
						|
import distro
 | 
						|
from pydantic import BaseModel, ConfigDict, Field
 | 
						|
 | 
						|
from cmapi_server.constants import (
 | 
						|
    MDB_CS_PACKAGE_NAME, MDB_SERVER_PACKAGE_NAME, PKG_GET_VER_CMD,
 | 
						|
    SUPPORTED_DISTROS, SUPPORTED_ARCHITECTURES, VERSION_PATH, MultiDistroNamer
 | 
						|
)
 | 
						|
from cmapi_server.exceptions import CMAPIBasicError
 | 
						|
from cmapi_server.process_dispatchers.base import BaseDispatcher
 | 
						|
 | 
						|
 | 
						|
class AppManager:
 | 
						|
    started: bool = False
 | 
						|
    version: Optional[str] = None
 | 
						|
    git_revision: Optional[str] = None
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def get_version(cls) -> str:
 | 
						|
        if cls.version:
 | 
						|
            return cls.version
 | 
						|
        version, revision = cls._read_version_file()
 | 
						|
        cls.version = version
 | 
						|
        cls.git_revision = revision
 | 
						|
        return cls.version
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def get_git_revision(cls) -> Optional[str]:
 | 
						|
        if cls.git_revision is not None:
 | 
						|
            return cls.git_revision
 | 
						|
        _, revision = cls._read_version_file()
 | 
						|
        cls.git_revision = revision
 | 
						|
        return cls.git_revision
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _read_version_file(cls) -> Tuple[str, Optional[str]]:
 | 
						|
        """Read structured values from VERSION file.
 | 
						|
 | 
						|
        Returns tuple: (semantic_version, git_revision or None)
 | 
						|
        """
 | 
						|
        values: Dict[str, str] = {}
 | 
						|
        try:
 | 
						|
            with open(VERSION_PATH, encoding='utf-8') as version_file:
 | 
						|
                for line in version_file.read().splitlines():
 | 
						|
                    if not line or '=' not in line:
 | 
						|
                        continue
 | 
						|
                    key, val = line.strip().split('=', 1)
 | 
						|
                    values[key.strip()] = val.strip()
 | 
						|
        except Exception:
 | 
						|
            logging.exception("Failed to read VERSION file")
 | 
						|
            return 'Undefined', None
 | 
						|
 | 
						|
        # Release (build) part is optional
 | 
						|
        release = values.get('CMAPI_VERSION_RELEASE')
 | 
						|
        revision = values.get('CMAPI_GIT_REVISION')
 | 
						|
 | 
						|
        required_keys = (
 | 
						|
            'CMAPI_VERSION_MAJOR',
 | 
						|
            'CMAPI_VERSION_MINOR',
 | 
						|
            'CMAPI_VERSION_PATCH',
 | 
						|
        )
 | 
						|
        if not all(k in values and values[k] for k in required_keys):
 | 
						|
            logging.error("Couldn't detect version from VERSION file!")
 | 
						|
            return 'Undefined', revision
 | 
						|
 | 
						|
        version = '.'.join([
 | 
						|
            values['CMAPI_VERSION_MAJOR'],
 | 
						|
            values['CMAPI_VERSION_MINOR'],
 | 
						|
            values['CMAPI_VERSION_PATCH'],
 | 
						|
        ])
 | 
						|
        if release:
 | 
						|
            version = f"{version}.{release}"
 | 
						|
        return version, revision
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def get_architecture(cls) -> str:
 | 
						|
        """Get system architecture.
 | 
						|
 | 
						|
        :return: system architecture
 | 
						|
        :rtype: str
 | 
						|
        """
 | 
						|
        arch = platform.machine().lower()
 | 
						|
        if arch not in SUPPORTED_ARCHITECTURES:
 | 
						|
            message = f'Unsupported architecture: {arch}'
 | 
						|
            logging.error(message)
 | 
						|
            raise CMAPIBasicError(message)
 | 
						|
        if arch in ['x86_64', 'amd64']:
 | 
						|
            arch = 'x86_64'
 | 
						|
        elif arch in ['aarch64', 'arm64']:
 | 
						|
            arch = 'aarch64'
 | 
						|
        return arch
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def get_distro_info(cls) -> tuple[str, str]:
 | 
						|
        """Get OS name and version.
 | 
						|
 | 
						|
        :return: OS name and version
 | 
						|
        :rtype: tuple[str, str]
 | 
						|
        """
 | 
						|
        platform_name = platform.system().lower()
 | 
						|
        if platform_name == 'linux':
 | 
						|
            distro_name = distro.id().lower()
 | 
						|
            distro_version = distro.version().lower()
 | 
						|
            if distro_name == 'debian':
 | 
						|
                if distro_version.startswith('11'):
 | 
						|
                    distro_version = 'bullseye'
 | 
						|
                elif distro_version.startswith('12'):
 | 
						|
                    distro_version = 'bookworm'
 | 
						|
                else:
 | 
						|
                    message = (
 | 
						|
                        f'Unsupported Debian version: {distro_version}. '
 | 
						|
                        'Supported versions are 11 (bullseye) and 12 '
 | 
						|
                        '(bookworm).'
 | 
						|
                    )
 | 
						|
                    logging.error(message)
 | 
						|
                    raise CMAPIBasicError(message)
 | 
						|
            if distro_name == 'ubuntu':
 | 
						|
                if distro_version.startswith('20.04'):
 | 
						|
                    distro_version = 'focal'
 | 
						|
                elif distro_version.startswith('22.04'):
 | 
						|
                    distro_version = 'jammy'
 | 
						|
                elif distro_version.startswith('24.04'):
 | 
						|
                    distro_version = 'noble'
 | 
						|
                else:
 | 
						|
                    message = (
 | 
						|
                        f'Unsupported Ubuntu version: {distro_version}. '
 | 
						|
                        'Supported versions are 20.04 (focal), 22.04 (jammy), '
 | 
						|
                        'and 24.04 (noble).'
 | 
						|
                    )
 | 
						|
                    logging.error(message)
 | 
						|
                    raise CMAPIBasicError(message)
 | 
						|
            if distro_name not in SUPPORTED_DISTROS:
 | 
						|
                message = (
 | 
						|
                    f'Unsupported Linux distribution: {distro_name}. '
 | 
						|
                )
 | 
						|
                logging.error(message)
 | 
						|
                raise CMAPIBasicError(message)
 | 
						|
        else:
 | 
						|
            message = f'Unsupported platform: {platform_name}'
 | 
						|
            logging.error(message)
 | 
						|
            raise CMAPIBasicError(message)
 | 
						|
        return distro_name, distro_version
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def get_installed_pkg_ver(cls, pkg_namer: MultiDistroNamer) -> str:
 | 
						|
        """Get package version with given package name.
 | 
						|
 | 
						|
        :param pkg_namer: object that contains package name for several ditros
 | 
						|
        :type pkg_namer: MultiDistroNamer
 | 
						|
        :raises CMAPIBasicError: if failed getting version
 | 
						|
        :return: package version
 | 
						|
        :rtype: str
 | 
						|
        """
 | 
						|
        distro_name, _ = cls.get_distro_info()
 | 
						|
        cmd: str = ''
 | 
						|
        package_name: str = ''
 | 
						|
        if distro_name in ['ubuntu', 'debian']:
 | 
						|
            package_name = pkg_namer.deb
 | 
						|
            cmd = PKG_GET_VER_CMD.deb.format(package_name=package_name)
 | 
						|
        elif distro_name in ['centos', 'rhel', 'rocky', 'almalinux']:
 | 
						|
            package_name = pkg_namer.rhel
 | 
						|
            cmd = PKG_GET_VER_CMD.rhel.format(package_name=package_name)
 | 
						|
        success, result_raw = BaseDispatcher.exec_command(cmd)
 | 
						|
        if not success:
 | 
						|
            message = (
 | 
						|
                f'Failed to get {package_name} version with result: '
 | 
						|
                f'{result_raw}'
 | 
						|
            )
 | 
						|
            logging.error(message)
 | 
						|
            raise CMAPIBasicError(message)
 | 
						|
        version_clean = result_raw
 | 
						|
        if distro_name in ['ubuntu', 'debian']:
 | 
						|
            # remove prefix before : (epoch)
 | 
						|
            result_raw = result_raw.split(':', 1)[1]
 | 
						|
            # remove suffix after first '+'
 | 
						|
            version_clean = result_raw.split('+', 1)[0]
 | 
						|
        return version_clean
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def get_mdb_version(cls) -> str:
 | 
						|
        """Get MDB version.
 | 
						|
 | 
						|
        :return: MDB version
 | 
						|
        :rtype: str
 | 
						|
        """
 | 
						|
        return cls.get_installed_pkg_ver(MDB_SERVER_PACKAGE_NAME)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def get_columnstore_version(cls) -> str:
 | 
						|
        """Get Columnstore version.
 | 
						|
 | 
						|
        :return: Columnstore version
 | 
						|
        :rtype: str
 | 
						|
        """
 | 
						|
        return cls.get_installed_pkg_ver(MDB_CS_PACKAGE_NAME)
 | 
						|
 | 
						|
@total_ordering
 | 
						|
class StatefulVersionModel(BaseModel):
 | 
						|
    """
 | 
						|
    Version info inspired by Raft consensus algorithm.
 | 
						|
 | 
						|
    Provides two layers of ordering for distributed state updates:
 | 
						|
 | 
						|
    1. term = "leader epoch" or "fencing token"
 | 
						|
       - Incremented each time a new leader is elected.
 | 
						|
       - Ensures updates from old leaders are ignored.
 | 
						|
       - Represents the leadership period the update belongs to.
 | 
						|
    2. seq = "sequence number" within a term
 | 
						|
       - Monotonically increases for every change made by the leader.
 | 
						|
       - Ensures updates from the same leader are applied in order.
 | 
						|
       - Represents the change number during the leader's term.
 | 
						|
    """
 | 
						|
 | 
						|
    term: int = Field(ge=0)
 | 
						|
    seq: int = Field(ge=0)
 | 
						|
 | 
						|
    model_config = ConfigDict(frozen=True)
 | 
						|
 | 
						|
    def __eq__(self, other: object) -> bool:
 | 
						|
        if not isinstance(other, StatefulVersionModel):
 | 
						|
            return NotImplemented
 | 
						|
        return (self.term, self.seq) == (other.term, other.seq)
 | 
						|
 | 
						|
    def __lt__(self, other: "StatefulVersionModel") -> bool:
 | 
						|
        if not isinstance(other, StatefulVersionModel):
 | 
						|
            return NotImplemented
 | 
						|
        return (self.term, self.seq) < (other.term, other.seq)
 | 
						|
 | 
						|
    def next_seq(self) -> "StatefulVersionModel":
 | 
						|
        """Return new version with incremented seq."""
 | 
						|
        return StatefulVersionModel(term=self.term, seq=self.seq + 1)
 | 
						|
 | 
						|
    def next_term(self) -> "StatefulVersionModel":
 | 
						|
        """Return new version with incremented term and reset seq."""
 | 
						|
        return StatefulVersionModel(term=self.term + 1, seq=0)
 | 
						|
 | 
						|
 | 
						|
class StatefulFlagsModel(BaseModel):
 | 
						|
    """Flags for stateful config."""
 | 
						|
 | 
						|
    shared_storage_on: bool = False
 | 
						|
 | 
						|
    model_config = ConfigDict(frozen=True)
 | 
						|
 | 
						|
 | 
						|
class StatefulConfigModel(BaseModel):
 | 
						|
    """Stateful config model with version and flags."""
 | 
						|
 | 
						|
    version: StatefulVersionModel
 | 
						|
    flags: StatefulFlagsModel
 | 
						|
 | 
						|
 | 
						|
class AppStatefulConfig:
 | 
						|
    """
 | 
						|
    Stateful config shared by cluster nodes in memory.
 | 
						|
 | 
						|
    Uses a versioned config with thread-safe updates to avoid stale writes.
 | 
						|
    Flags are stored as key-value pairs in a dictionary.
 | 
						|
    # TODO: Change version.term after primary changes.
 | 
						|
    """
 | 
						|
 | 
						|
    _lock = threading.RLock()
 | 
						|
    _config: StatefulConfigModel = StatefulConfigModel(
 | 
						|
        version=StatefulVersionModel(term=0, seq=0),
 | 
						|
        flags=StatefulFlagsModel(shared_storage_on=False)
 | 
						|
    )
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def get_config_copy(cls) -> StatefulConfigModel:
 | 
						|
        """Get the current config atomically.
 | 
						|
 | 
						|
        :return: Current config with flags and version.
 | 
						|
        """
 | 
						|
        with cls._lock:
 | 
						|
            return cls._config.model_copy(deep=True)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def to_dict(cls) -> dict[str, Any]:
 | 
						|
        """
 | 
						|
        Get the current config flags and version atomically.
 | 
						|
 | 
						|
        :return: Dictionary with all flags and 'version' key included.
 | 
						|
        """
 | 
						|
        with cls._lock:
 | 
						|
            return cls._config.model_dump(mode='json')
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def apply_update(cls, new_config: StatefulConfigModel) -> bool:
 | 
						|
        """
 | 
						|
        Apply updates to config flags if the version is newer.
 | 
						|
 | 
						|
        Only updates flags present in new_flags. The entire update is applied
 | 
						|
        atomically and only if the version is newer than the current version.
 | 
						|
 | 
						|
        :param new_config: New config with updated flags and version.
 | 
						|
        :return: True if update was applied; False if update was stale or version missing.
 | 
						|
        """
 | 
						|
 | 
						|
        with cls._lock:
 | 
						|
            if new_config.version <= cls._config.version:
 | 
						|
                return False  # stale update
 | 
						|
            cls._config = new_config
 | 
						|
            return True
 | 
						|
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def is_shared_storage(cls) -> bool:
 | 
						|
        """Check if shared storage is enabled.
 | 
						|
 | 
						|
        :return: True if shared storage is enabled, False otherwise.
 | 
						|
        """
 | 
						|
        with cls._lock:
 | 
						|
            return cls._config.flags.shared_storage_on
 |