You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2026-01-06 08:21:10 +03:00
Use the same mcs-shmem-locks tool that we used to inspect locks state to unlock them
This commit is contained in:
committed by
Leonid Fedorov
parent
384dd56a61
commit
cff5244e35
@@ -92,9 +92,8 @@ IFLAG = os.path.join(MCS_ETC_PATH, 'container-initialized')
|
||||
LIBJEMALLOC_DEFAULT_PATH = os.path.join(MCS_DATA_PATH, 'libjemalloc.so.2')
|
||||
MCS_LOG_PATH = '/var/log/mariadb/columnstore'
|
||||
|
||||
# tools for BRM shmem lock inspection/reset
|
||||
# BRM shmem lock inspection/reset tool
|
||||
SHMEM_LOCKS_PATH = os.path.join(MCS_INSTALL_BIN, 'mcs-shmem-locks')
|
||||
RESET_LOCKS_PATH = os.path.join(MCS_INSTALL_BIN, 'reset_locks')
|
||||
|
||||
# client constants
|
||||
CMAPI_PORT = 8640 #TODO: use it in all places
|
||||
|
||||
@@ -223,8 +223,7 @@ class ContainerDispatcher(BaseDispatcher):
|
||||
# Run pre-stop lock reset before saving BRM
|
||||
# These stale locks can occur if the controllernode couldn't stop correctly
|
||||
# and they cause mcs-savebrm.py to hang
|
||||
|
||||
dispatcher_utils.reset_shmem_locks(logger)
|
||||
dispatcher_utils.release_shmem_locks(logger)
|
||||
|
||||
# start mcs-savebrm.py before stoping workernode
|
||||
logger.debug('Waiting to save BRM.')
|
||||
|
||||
@@ -168,7 +168,7 @@ class SystemdDispatcher(BaseDispatcher):
|
||||
# Run pre-stop lock reset before saving BRM
|
||||
# These stale locks can occur if the controllernode couldn't stop correctly
|
||||
# and they cause mcs-savebrm.py to hang
|
||||
dispatcher_utils.reset_shmem_locks(logging.getLogger(__name__))
|
||||
dispatcher_utils.release_shmem_locks(logging.getLogger(__name__))
|
||||
|
||||
service_name = f'{service_name}@1.service {service_name}@2.service'
|
||||
cls._workernode_enable(False, use_sudo)
|
||||
|
||||
@@ -1,64 +1,104 @@
|
||||
import logging
|
||||
import re
|
||||
from time import sleep
|
||||
from typing import Optional
|
||||
from cmapi_server.constants import SHMEM_LOCKS_PATH, RESET_LOCKS_PATH
|
||||
from typing import Optional, List, Tuple
|
||||
from cmapi_server.constants import SHMEM_LOCKS_PATH
|
||||
from cmapi_server.process_dispatchers.base import BaseDispatcher
|
||||
|
||||
|
||||
def parse_locks_num(cmd_output: str) -> int:
|
||||
"""Parse output of mcs-shmem-locks command."""
|
||||
active_total = 0
|
||||
def parse_locks_state(cmd_output: str, logger: logging.Logger) -> List[Tuple[int, int, int]]:
|
||||
"""Parse per-lock state from mcs-shmem-locks output.
|
||||
|
||||
Returns a list of tuples: (lock_id, readers, writers)
|
||||
"""
|
||||
locks: List[Tuple[int, int, int]] = []
|
||||
current_id = 0
|
||||
readers = None
|
||||
writers = None
|
||||
|
||||
for line in cmd_output.splitlines():
|
||||
m = re.search(r'^\s*(readers|writers)\s*=\s*(\d+)', line)
|
||||
if line.strip().endswith('RWLock'):
|
||||
# flush previous section counts (if we have both)
|
||||
if current_id > 0 and readers is not None and writers is not None:
|
||||
locks.append((current_id, readers, writers))
|
||||
current_id += 1
|
||||
readers = None
|
||||
writers = None
|
||||
continue
|
||||
|
||||
m = re.search(r'^\s*readers\s*=\s*(\d+)', line)
|
||||
if m:
|
||||
try:
|
||||
active_total += int(m.group(2))
|
||||
readers = int(m.group(1))
|
||||
except ValueError:
|
||||
pass
|
||||
return active_total
|
||||
logger.warning('Failed to parse readers count from line: %s', line)
|
||||
readers = 0
|
||||
continue
|
||||
|
||||
m = re.search(r'^\s*writers\s*=\s*(\d+)', line)
|
||||
if m:
|
||||
try:
|
||||
writers = int(m.group(1))
|
||||
except ValueError:
|
||||
logger.warning('Failed to parse writers count from line: %s', line)
|
||||
writers = 0
|
||||
continue
|
||||
|
||||
# flush the last parsed lock
|
||||
if current_id > 0 and readers is not None and writers is not None:
|
||||
locks.append((current_id, readers, writers))
|
||||
|
||||
return locks
|
||||
|
||||
|
||||
def get_active_shmem_locks_num(logger: logging.Logger) -> Optional[int]:
|
||||
"""Get number of active shmem locks."""
|
||||
cmd = f'{SHMEM_LOCKS_PATH} --lock-id 0'
|
||||
success, out = BaseDispatcher.exec_command(cmd)
|
||||
if not success:
|
||||
logger.error('Failed to inspect shmem locks (command failed)')
|
||||
return None
|
||||
if not out:
|
||||
logger.error('Failed to inspect shmem locks (empty output)')
|
||||
return None
|
||||
def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool:
|
||||
"""Attempt to release active shmem locks.
|
||||
|
||||
logger.debug('Current lock state:\n%s', (out or '').strip())
|
||||
- Inspect all locks.
|
||||
- Unlock writer lock (there can be only one)
|
||||
- Unlock each reader lock sequentially
|
||||
- Re-check and repeat up to max_iterations.
|
||||
|
||||
return parse_locks_num(out)
|
||||
Returns True on success (no active readers/writers remain), False otherwise.
|
||||
"""
|
||||
for attempt in range(1, max_iterations + 1):
|
||||
success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0')
|
||||
if not success or not out:
|
||||
logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt)
|
||||
return False
|
||||
|
||||
locks = parse_locks_state(out, logger=logger)
|
||||
|
||||
def reset_shmem_locks(logger: logging.Logger) -> None:
|
||||
"""Inspect and reset BRM shmem locks"""
|
||||
logger.debug('Inspecting and resetting shmem locks.')
|
||||
total_active = sum_active_locks(locks)
|
||||
if total_active == 0:
|
||||
logger.debug('Unlock attempt %d: no active locks', attempt)
|
||||
return True
|
||||
logger.debug('Unlock attempt %d: active total=%d; detail=%s', attempt, total_active, locks)
|
||||
|
||||
# Get current lock state
|
||||
active_locks_num = get_active_shmem_locks_num(logger)
|
||||
if active_locks_num is None:
|
||||
return
|
||||
# Issue unlocks per lock
|
||||
for lock_id, readers, writers in locks:
|
||||
# Unlock writer
|
||||
if writers > 0:
|
||||
cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -w -u'
|
||||
ok, _ = BaseDispatcher.exec_command(cmd)
|
||||
if not ok:
|
||||
logger.warning('Failed to unlock writer for lock-id=%d', lock_id)
|
||||
|
||||
# Reset if any read/write locks are active
|
||||
if active_locks_num > 0:
|
||||
logger.info('Detected active shmem locks (sum readers+writers=%d). Attempting reset.', active_locks_num)
|
||||
# Unlock all readers
|
||||
if readers > 0:
|
||||
for _ in range(readers):
|
||||
cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -r -u'
|
||||
ok, _ = BaseDispatcher.exec_command(cmd)
|
||||
if not ok:
|
||||
logger.warning('Failed to unlock a reader for lock-id=%d', lock_id)
|
||||
break
|
||||
|
||||
# Reset locks
|
||||
success, out = BaseDispatcher.exec_command(f'{RESET_LOCKS_PATH} -s')
|
||||
if not success:
|
||||
logger.error('Failed to reset shmem locks (command failed)')
|
||||
return
|
||||
|
||||
# Check that locks were reset
|
||||
# Wait some time for state to settle
|
||||
sleep(1)
|
||||
active_locks_num = get_active_shmem_locks_num(logger)
|
||||
if active_locks_num is not None and active_locks_num > 0:
|
||||
logger.error('Failed to reset shmem locks (locks are still active)')
|
||||
return
|
||||
else:
|
||||
logger.info('No active shmem locks detected.')
|
||||
|
||||
logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts', max_iterations)
|
||||
return False
|
||||
|
||||
|
||||
def sum_active_locks(locks: List[Tuple[int, int, int]]) -> int:
|
||||
return sum(r + w for _, r, w in locks)
|
||||
|
||||
75
cmapi/cmapi_server/test/test_shmem_locks_parsing.py
Normal file
75
cmapi/cmapi_server/test/test_shmem_locks_parsing.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
from cmapi_server.process_dispatchers.utils import (
|
||||
parse_locks_state,
|
||||
sum_active_locks,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class TestShmemLocksParsing(unittest.TestCase):
|
||||
def test_parse_locks_state_basic(self):
|
||||
sample_output = """
|
||||
VSS RWLock
|
||||
readers = 2
|
||||
writers = 1
|
||||
readers waiting = 0
|
||||
writers waiting = 0
|
||||
mutex locked = 0
|
||||
ExtentMap RWLock
|
||||
readers = 0
|
||||
writers = 0
|
||||
readers waiting = 0
|
||||
writers waiting = 0
|
||||
mutex locked = 0
|
||||
"""
|
||||
state = parse_locks_state(sample_output, logger)
|
||||
# Two sections => IDs 1 and 2
|
||||
self.assertEqual(state, [(1, 2, 1), (2, 0, 0)])
|
||||
|
||||
def test_parse_locks_state_malformed_values(self):
|
||||
sample_output = """
|
||||
VSS RWLock
|
||||
readers = blorg
|
||||
writers = 1
|
||||
readers waiting = 0
|
||||
writers waiting = 0
|
||||
mutex locked = 0
|
||||
ExtentMap RWLock
|
||||
readers = 3
|
||||
writers = one
|
||||
readers waiting = 0
|
||||
writers waiting = 0
|
||||
mutex locked = 0
|
||||
FreeList RWLock
|
||||
readers = 1
|
||||
writers = 0
|
||||
readers waiting = 0
|
||||
writers waiting = 0
|
||||
mutex locked = 0
|
||||
"""
|
||||
state = parse_locks_state(sample_output, logger)
|
||||
self.assertEqual(state, [(3, 1, 0)])
|
||||
|
||||
def test_parse_locks_state_partial_section_ignored(self):
|
||||
sample_output = """
|
||||
VSS RWLock
|
||||
readers = 4
|
||||
writers = 0
|
||||
readers waiting = 0
|
||||
writers waiting = 0
|
||||
mutex locked = 0
|
||||
ExtentMap RWLock
|
||||
readers = 1
|
||||
readers waiting = 0
|
||||
writers waiting = 0
|
||||
mutex locked = 0
|
||||
"""
|
||||
state = parse_locks_state(sample_output, logger)
|
||||
# Second section missing writers, so we skip it
|
||||
self.assertEqual(state, [(1, 4, 0)])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user