1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-11-21 09:20:51 +03:00
Files
mariadb-columnstore-engine/cmapi/cmapi_server/invariant_checks.py
2025-10-29 17:58:47 +00:00

98 lines
2.9 KiB
Python

import logging
import os
from typing import Optional, Tuple
from mr_kot import Runner, Status, any_of, check, check_all, fact, parametrize
from mr_kot_fs_validators import Exists, GroupIs, HasMode, IsDir, OwnerIs
from cmapi_server import helpers
from cmapi_server.constants import MCS_DATA_PATH
from mcs_node_control.models.node_config import NodeConfig
logger = logging.getLogger(__name__)
def run_invariant_checks() -> Optional[str]:
"""Run invariant checks, log results, and return a formatted string with problems, if any.
"""
logger.info('Starting invariant checks')
runner = Runner()
result = runner.run()
problems = result.problems()
# Log each fail/error/warning for diagnostics
diag = ''
for problem in problems:
fn = logger.warning if problem.status == Status.WARN else logger.error
fn(
'Invariant check with id=%s produced %s: %r',
problem.id, problem.status, problem.evidence,
)
diag += f'{problem.id}: {problem.evidence}\n'
logger.info(
'Stats: overall=%s counts=%s',
result.overall.value,
{k.value: v for k, v in result.counts.items() if v != 0}
)
if result.overall in (Status.FAIL, Status.ERROR):
logger.error('Invariant checks failed')
return diag
else:
logger.info('Invariant checks passed')
### Facts
@fact
def storage_type() -> str:
"""Provides storage type: shared_fs or s3."""
return 's3' if NodeConfig().s3_enabled() else 'shared_fs'
@fact
def is_shared_fs(storage_type: str) -> bool:
return storage_type == 'shared_fs'
@fact
def dispatcher_name() -> str:
"""Provides environment dispatcher name: systemd or container"""
cfg = helpers.get_config_parser()
name, _ = helpers.get_dispatcher_name_and_path(cfg)
return name
@fact
def is_systemd_disp(dispatcher_name: str) -> bool:
return dispatcher_name == 'systemd'
### Checks
REQUIRED_LOCAL_DIRS = [
os.path.join(MCS_DATA_PATH, 'data1'),
os.path.join(MCS_DATA_PATH, 'data1', 'systemFiles'),
os.path.join(MCS_DATA_PATH, 'data1', 'systemFiles', 'dbrm'),
]
@check(selector='is_shared_fs')
@parametrize('dir', values=REQUIRED_LOCAL_DIRS, fail_fast=True)
def required_dirs_perms(dir: str) -> Tuple[Status, str]:
status, ev = check_all(
dir,
Exists(),
IsDir(),
HasMode('1755'),
)
return (status, ev)
@check(selector='is_shared_fs, is_systemd_disp')
@parametrize('dir', values=REQUIRED_LOCAL_DIRS, fail_fast=True)
def required_dirs_ownership(dir: str) -> Tuple[Status, str]:
# Check ownership only when not in containers
status, ev = check_all(
dir,
Exists(),
# The correct owner is mysql, but i've seen mariadb as owner of the mountpoint,
# so we allow both
any_of(OwnerIs('mysql'), OwnerIs('mariadb')),
any_of(GroupIs('mysql'), GroupIs('mariadb')),
)
return (status, ev)