diff --git a/cmapi/cmapi_server/constants.py b/cmapi/cmapi_server/constants.py index 13da47f07..4f9cbe9c8 100644 --- a/cmapi/cmapi_server/constants.py +++ b/cmapi/cmapi_server/constants.py @@ -92,9 +92,8 @@ IFLAG = os.path.join(MCS_ETC_PATH, 'container-initialized') LIBJEMALLOC_DEFAULT_PATH = os.path.join(MCS_DATA_PATH, 'libjemalloc.so.2') MCS_LOG_PATH = '/var/log/mariadb/columnstore' -# tools for BRM shmem lock inspection/reset +# BRM shmem lock inspection/reset tool SHMEM_LOCKS_PATH = os.path.join(MCS_INSTALL_BIN, 'mcs-shmem-locks') -RESET_LOCKS_PATH = os.path.join(MCS_INSTALL_BIN, 'reset_locks') # client constants CMAPI_PORT = 8640 #TODO: use it in all places diff --git a/cmapi/cmapi_server/process_dispatchers/container.py b/cmapi/cmapi_server/process_dispatchers/container.py index 233373bb7..c3204ad4e 100644 --- a/cmapi/cmapi_server/process_dispatchers/container.py +++ b/cmapi/cmapi_server/process_dispatchers/container.py @@ -223,8 +223,7 @@ class ContainerDispatcher(BaseDispatcher): # Run pre-stop lock reset before saving BRM # These stale locks can occur if the controllernode couldn't stop correctly # and they cause mcs-savebrm.py to hang - - dispatcher_utils.reset_shmem_locks(logger) + dispatcher_utils.release_shmem_locks(logger) # start mcs-savebrm.py before stoping workernode logger.debug('Waiting to save BRM.') diff --git a/cmapi/cmapi_server/process_dispatchers/systemd.py b/cmapi/cmapi_server/process_dispatchers/systemd.py index ccbc277ef..b83a55e23 100644 --- a/cmapi/cmapi_server/process_dispatchers/systemd.py +++ b/cmapi/cmapi_server/process_dispatchers/systemd.py @@ -168,7 +168,7 @@ class SystemdDispatcher(BaseDispatcher): # Run pre-stop lock reset before saving BRM # These stale locks can occur if the controllernode couldn't stop correctly # and they cause mcs-savebrm.py to hang - dispatcher_utils.reset_shmem_locks(logging.getLogger(__name__)) + dispatcher_utils.release_shmem_locks(logging.getLogger(__name__)) service_name = f'{service_name}@1.service {service_name}@2.service' cls._workernode_enable(False, use_sudo) diff --git a/cmapi/cmapi_server/process_dispatchers/utils.py b/cmapi/cmapi_server/process_dispatchers/utils.py index 05cb1af8a..45b14461a 100644 --- a/cmapi/cmapi_server/process_dispatchers/utils.py +++ b/cmapi/cmapi_server/process_dispatchers/utils.py @@ -1,64 +1,104 @@ import logging import re from time import sleep -from typing import Optional -from cmapi_server.constants import SHMEM_LOCKS_PATH, RESET_LOCKS_PATH +from typing import Optional, List, Tuple +from cmapi_server.constants import SHMEM_LOCKS_PATH from cmapi_server.process_dispatchers.base import BaseDispatcher -def parse_locks_num(cmd_output: str) -> int: - """Parse output of mcs-shmem-locks command.""" - active_total = 0 +def parse_locks_state(cmd_output: str, logger: logging.Logger) -> List[Tuple[int, int, int]]: + """Parse per-lock state from mcs-shmem-locks output. + + Returns a list of tuples: (lock_id, readers, writers) + """ + locks: List[Tuple[int, int, int]] = [] + current_id = 0 + readers = None + writers = None + for line in cmd_output.splitlines(): - m = re.search(r'^\s*(readers|writers)\s*=\s*(\d+)', line) + if line.strip().endswith('RWLock'): + # flush previous section counts (if we have both) + if current_id > 0 and readers is not None and writers is not None: + locks.append((current_id, readers, writers)) + current_id += 1 + readers = None + writers = None + continue + + m = re.search(r'^\s*readers\s*=\s*(\d+)', line) if m: try: - active_total += int(m.group(2)) + readers = int(m.group(1)) except ValueError: - pass - return active_total + logger.warning('Failed to parse readers count from line: %s', line) + readers = 0 + continue + + m = re.search(r'^\s*writers\s*=\s*(\d+)', line) + if m: + try: + writers = int(m.group(1)) + except ValueError: + logger.warning('Failed to parse writers count from line: %s', line) + writers = 0 + continue + + # flush the last parsed lock + if current_id > 0 and readers is not None and writers is not None: + locks.append((current_id, readers, writers)) + + return locks -def get_active_shmem_locks_num(logger: logging.Logger) -> Optional[int]: - """Get number of active shmem locks.""" - cmd = f'{SHMEM_LOCKS_PATH} --lock-id 0' - success, out = BaseDispatcher.exec_command(cmd) - if not success: - logger.error('Failed to inspect shmem locks (command failed)') - return None - if not out: - logger.error('Failed to inspect shmem locks (empty output)') - return None +def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool: + """Attempt to release active shmem locks. - logger.debug('Current lock state:\n%s', (out or '').strip()) + - Inspect all locks. + - Unlock writer lock (there can be only one) + - Unlock each reader lock sequentially + - Re-check and repeat up to max_iterations. - return parse_locks_num(out) + Returns True on success (no active readers/writers remain), False otherwise. + """ + for attempt in range(1, max_iterations + 1): + success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0') + if not success or not out: + logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt) + return False + locks = parse_locks_state(out, logger=logger) -def reset_shmem_locks(logger: logging.Logger) -> None: - """Inspect and reset BRM shmem locks""" - logger.debug('Inspecting and resetting shmem locks.') + total_active = sum_active_locks(locks) + if total_active == 0: + logger.debug('Unlock attempt %d: no active locks', attempt) + return True + logger.debug('Unlock attempt %d: active total=%d; detail=%s', attempt, total_active, locks) - # Get current lock state - active_locks_num = get_active_shmem_locks_num(logger) - if active_locks_num is None: - return + # Issue unlocks per lock + for lock_id, readers, writers in locks: + # Unlock writer + if writers > 0: + cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -w -u' + ok, _ = BaseDispatcher.exec_command(cmd) + if not ok: + logger.warning('Failed to unlock writer for lock-id=%d', lock_id) - # Reset if any read/write locks are active - if active_locks_num > 0: - logger.info('Detected active shmem locks (sum readers+writers=%d). Attempting reset.', active_locks_num) + # Unlock all readers + if readers > 0: + for _ in range(readers): + cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -r -u' + ok, _ = BaseDispatcher.exec_command(cmd) + if not ok: + logger.warning('Failed to unlock a reader for lock-id=%d', lock_id) + break - # Reset locks - success, out = BaseDispatcher.exec_command(f'{RESET_LOCKS_PATH} -s') - if not success: - logger.error('Failed to reset shmem locks (command failed)') - return - - # Check that locks were reset + # Wait some time for state to settle sleep(1) - active_locks_num = get_active_shmem_locks_num(logger) - if active_locks_num is not None and active_locks_num > 0: - logger.error('Failed to reset shmem locks (locks are still active)') - return - else: - logger.info('No active shmem locks detected.') + + logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts', max_iterations) + return False + + +def sum_active_locks(locks: List[Tuple[int, int, int]]) -> int: + return sum(r + w for _, r, w in locks) diff --git a/cmapi/cmapi_server/test/test_shmem_locks_parsing.py b/cmapi/cmapi_server/test/test_shmem_locks_parsing.py new file mode 100644 index 000000000..9dcc86866 --- /dev/null +++ b/cmapi/cmapi_server/test/test_shmem_locks_parsing.py @@ -0,0 +1,75 @@ +import unittest +import logging + +from cmapi_server.process_dispatchers.utils import ( + parse_locks_state, + sum_active_locks, +) + +logger = logging.getLogger(__name__) + +class TestShmemLocksParsing(unittest.TestCase): + def test_parse_locks_state_basic(self): + sample_output = """ + VSS RWLock + readers = 2 + writers = 1 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + ExtentMap RWLock + readers = 0 + writers = 0 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + """ + state = parse_locks_state(sample_output, logger) + # Two sections => IDs 1 and 2 + self.assertEqual(state, [(1, 2, 1), (2, 0, 0)]) + + def test_parse_locks_state_malformed_values(self): + sample_output = """ + VSS RWLock + readers = blorg + writers = 1 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + ExtentMap RWLock + readers = 3 + writers = one + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + FreeList RWLock + readers = 1 + writers = 0 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + """ + state = parse_locks_state(sample_output, logger) + self.assertEqual(state, [(3, 1, 0)]) + + def test_parse_locks_state_partial_section_ignored(self): + sample_output = """ + VSS RWLock + readers = 4 + writers = 0 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + ExtentMap RWLock + readers = 1 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + """ + state = parse_locks_state(sample_output, logger) + # Second section missing writers, so we skip it + self.assertEqual(state, [(1, 4, 0)]) + + +if __name__ == "__main__": + unittest.main()