diff --git a/cmapi/cmapi_server/process_dispatchers/container.py b/cmapi/cmapi_server/process_dispatchers/container.py index c3204ad4e..62b6ea53c 100644 --- a/cmapi/cmapi_server/process_dispatchers/container.py +++ b/cmapi/cmapi_server/process_dispatchers/container.py @@ -13,7 +13,7 @@ import psutil from cmapi_server.constants import ( IFLAG, LIBJEMALLOC_DEFAULT_PATH, MCS_INSTALL_BIN, ALL_MCS_PROGS, ) -from cmapi_server.process_dispatchers import utils as dispatcher_utils +from cmapi_server.process_dispatchers.locks import release_shmem_locks from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.process_dispatchers.base import BaseDispatcher @@ -223,7 +223,7 @@ class ContainerDispatcher(BaseDispatcher): # Run pre-stop lock reset before saving BRM # These stale locks can occur if the controllernode couldn't stop correctly # and they cause mcs-savebrm.py to hang - dispatcher_utils.release_shmem_locks(logger) + locks.release_shmem_locks(logger) # start mcs-savebrm.py before stoping workernode logger.debug('Waiting to save BRM.') diff --git a/cmapi/cmapi_server/process_dispatchers/locks.py b/cmapi/cmapi_server/process_dispatchers/locks.py new file mode 100644 index 000000000..d017a39d6 --- /dev/null +++ b/cmapi/cmapi_server/process_dispatchers/locks.py @@ -0,0 +1,151 @@ +import logging +import re +from dataclasses import dataclass +from time import sleep +from typing import Optional, List +from cmapi_server.constants import SHMEM_LOCKS_PATH +from cmapi_server.process_dispatchers.base import BaseDispatcher + + +@dataclass +class LocksState: + id: int + name: Optional[str] + readers: int = 0 + writers: int = 0 + readers_waiting: int = 0 + writers_waiting: int = 0 + mutex_locked: bool = False + + def __str__(self): + name = f"({self.name})" if self.name else "" + return (f"LS {self.id}{name}: {self.readers}r/{self.writers}w" + + f" {self.readers_waiting}rw/{self.writers_waiting}ww m={int(self.mutex_locked)}") + + +def parse_locks_detail(cmd_output: str, logger: logging.Logger) -> List[LocksState]: + """Parse detailed per-lock state from mcs-shmem-locks output. + + Missing or malformed numeric fields are treated as 0. A logger must be provided + and will be used to emit warnings for malformed lines. + """ + states: List[LocksState] = [] + current: Optional[LocksState] = None + current_id = 0 + + for raw in cmd_output.splitlines(): + line = raw.strip() + if not line: + continue + + if line.endswith('RWLock'): + # push previous + if current is not None: + states.append(current) + current_id += 1 + name = line[:-len('RWLock')].strip() or None + current = LocksState(id=current_id, name=name) + continue + + if current is None: + continue + + field_specs = [ + (r'^readers\s*=\s*(\d+)$', 'readers'), + (r'^writers\s*=\s*(\d+)$', 'writers'), + (r'^readers waiting\s*=\s*(\d+)$', 'readers_waiting'), + (r'^writers waiting\s*=\s*(\d+)$', 'writers_waiting'), + ] + + matched = False + for pattern, attr in field_specs: + m = re.search(pattern, line) + if m: + try: + setattr(current, attr, int(m.group(1))) + except ValueError: + logger.warning('Failed to parse %s from line: %s', attr, raw) + setattr(current, attr, 0) + matched = True + break + if matched: + continue + + m = re.search(r'^mutex locked\s*=\s*(\d+)$', line) + if m: + try: + current.mutex_locked = int(m.group(1)) != 0 + except ValueError: + current.mutex_locked = False + continue + + # push the last one + if current is not None: + states.append(current) + + return states + + +def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool: + """Attempt to release active shmem locks. + + - Inspect all locks. + - Unlock writer lock (there can be only one active, but there can be multiple waiting) + - Unlock each reader lock sequentially + - Re-check and repeat up to max_iterations. + + Returns True on success (no active readers/writers remain), False otherwise. + """ + # We adapt attempts/sleep when there are waiting locks to allow promotions + attempt = 0 + while True: + attempt += 1 + success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0') + if not success or not out: + logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt) + return False + + states = parse_locks_detail(out, logger=logger) + active_total = sum(s.readers + s.writers for s in states) + waiting_total = sum(s.readers_waiting + s.writers_waiting for s in states) + if active_total == 0 and waiting_total == 0: + return True + + logger.debug( + 'Lock release attempt %d: active=%d waiting=%d; detailed=%s', + attempt, active_total, waiting_total, states + ) + + for st in states: + if st.writers > 0: + cmd = f'{SHMEM_LOCKS_PATH} -i {st.id} -w -u' + ok, _ = BaseDispatcher.exec_command(cmd) + if not ok: + logger.warning('Failed to unlock writer for lock-id=%d', st.id) + + if st.readers > 0: + for _ in range(st.readers): + cmd = f'{SHMEM_LOCKS_PATH} -i {st.id} -r -u' + ok, _ = BaseDispatcher.exec_command(cmd) + if not ok: + logger.warning('Failed to unlock a reader for lock-id=%d', st.id) + break + + # Wait for state to settle; longer if we have waiting locks + sleep(2 if waiting_total > 0 else 1) + + # Allow more attempts when there are waiting locks + effective_max = max_iterations if waiting_total == 0 else max(max_iterations, 15) + if attempt >= effective_max: + break + + logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts (active/waiting remain)', attempt) + return False + + +def sum_active_from_states(states: List[LocksState]) -> int: + return sum(s.readers + s.writers for s in states) + + +def sum_waiting_from_states(states: List[LocksState]) -> int: + return sum(s.readers_waiting + s.writers_waiting for s in states) diff --git a/cmapi/cmapi_server/process_dispatchers/systemd.py b/cmapi/cmapi_server/process_dispatchers/systemd.py index b83a55e23..8f5184e58 100644 --- a/cmapi/cmapi_server/process_dispatchers/systemd.py +++ b/cmapi/cmapi_server/process_dispatchers/systemd.py @@ -5,7 +5,7 @@ import re from typing import Union, Tuple from cmapi_server.process_dispatchers.base import BaseDispatcher -from cmapi_server.process_dispatchers import utils as dispatcher_utils +from cmapi_server.process_dispatchers.locks import release_shmem_locks class SystemdDispatcher(BaseDispatcher): @@ -168,7 +168,7 @@ class SystemdDispatcher(BaseDispatcher): # Run pre-stop lock reset before saving BRM # These stale locks can occur if the controllernode couldn't stop correctly # and they cause mcs-savebrm.py to hang - dispatcher_utils.release_shmem_locks(logging.getLogger(__name__)) + release_shmem_locks(logging.getLogger(__name__)) service_name = f'{service_name}@1.service {service_name}@2.service' cls._workernode_enable(False, use_sudo) diff --git a/cmapi/cmapi_server/process_dispatchers/utils.py b/cmapi/cmapi_server/process_dispatchers/utils.py deleted file mode 100644 index 45b14461a..000000000 --- a/cmapi/cmapi_server/process_dispatchers/utils.py +++ /dev/null @@ -1,104 +0,0 @@ -import logging -import re -from time import sleep -from typing import Optional, List, Tuple -from cmapi_server.constants import SHMEM_LOCKS_PATH -from cmapi_server.process_dispatchers.base import BaseDispatcher - - -def parse_locks_state(cmd_output: str, logger: logging.Logger) -> List[Tuple[int, int, int]]: - """Parse per-lock state from mcs-shmem-locks output. - - Returns a list of tuples: (lock_id, readers, writers) - """ - locks: List[Tuple[int, int, int]] = [] - current_id = 0 - readers = None - writers = None - - for line in cmd_output.splitlines(): - if line.strip().endswith('RWLock'): - # flush previous section counts (if we have both) - if current_id > 0 and readers is not None and writers is not None: - locks.append((current_id, readers, writers)) - current_id += 1 - readers = None - writers = None - continue - - m = re.search(r'^\s*readers\s*=\s*(\d+)', line) - if m: - try: - readers = int(m.group(1)) - except ValueError: - logger.warning('Failed to parse readers count from line: %s', line) - readers = 0 - continue - - m = re.search(r'^\s*writers\s*=\s*(\d+)', line) - if m: - try: - writers = int(m.group(1)) - except ValueError: - logger.warning('Failed to parse writers count from line: %s', line) - writers = 0 - continue - - # flush the last parsed lock - if current_id > 0 and readers is not None and writers is not None: - locks.append((current_id, readers, writers)) - - return locks - - -def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool: - """Attempt to release active shmem locks. - - - Inspect all locks. - - Unlock writer lock (there can be only one) - - Unlock each reader lock sequentially - - Re-check and repeat up to max_iterations. - - Returns True on success (no active readers/writers remain), False otherwise. - """ - for attempt in range(1, max_iterations + 1): - success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0') - if not success or not out: - logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt) - return False - - locks = parse_locks_state(out, logger=logger) - - total_active = sum_active_locks(locks) - if total_active == 0: - logger.debug('Unlock attempt %d: no active locks', attempt) - return True - logger.debug('Unlock attempt %d: active total=%d; detail=%s', attempt, total_active, locks) - - # Issue unlocks per lock - for lock_id, readers, writers in locks: - # Unlock writer - if writers > 0: - cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -w -u' - ok, _ = BaseDispatcher.exec_command(cmd) - if not ok: - logger.warning('Failed to unlock writer for lock-id=%d', lock_id) - - # Unlock all readers - if readers > 0: - for _ in range(readers): - cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -r -u' - ok, _ = BaseDispatcher.exec_command(cmd) - if not ok: - logger.warning('Failed to unlock a reader for lock-id=%d', lock_id) - break - - # Wait some time for state to settle - sleep(1) - - logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts', max_iterations) - return False - - -def sum_active_locks(locks: List[Tuple[int, int, int]]) -> int: - return sum(r + w for _, r, w in locks) diff --git a/cmapi/cmapi_server/test/test_shmem_locks_parsing.py b/cmapi/cmapi_server/test/test_shmem_locks_parsing.py index 9dcc86866..64937deb5 100644 --- a/cmapi/cmapi_server/test/test_shmem_locks_parsing.py +++ b/cmapi/cmapi_server/test/test_shmem_locks_parsing.py @@ -1,15 +1,17 @@ import unittest import logging -from cmapi_server.process_dispatchers.utils import ( - parse_locks_state, - sum_active_locks, +from cmapi_server.process_dispatchers.locks import ( + parse_locks_detail, + sum_active_from_states, + sum_waiting_from_states, ) logger = logging.getLogger(__name__) + class TestShmemLocksParsing(unittest.TestCase): - def test_parse_locks_state_basic(self): + def test_parse_locks_detail_basic(self): sample_output = """ VSS RWLock readers = 2 @@ -20,28 +22,48 @@ class TestShmemLocksParsing(unittest.TestCase): ExtentMap RWLock readers = 0 writers = 0 - readers waiting = 0 - writers waiting = 0 - mutex locked = 0 + readers waiting = 1 + writers waiting = 2 + mutex locked = 1 """ - state = parse_locks_state(sample_output, logger) - # Two sections => IDs 1 and 2 - self.assertEqual(state, [(1, 2, 1), (2, 0, 0)]) + states = parse_locks_detail(sample_output, logger) + self.assertEqual(len(states), 2) - def test_parse_locks_state_malformed_values(self): + # Check names + self.assertEqual(states[0].name, 'VSS') + self.assertEqual(states[1].name, 'ExtentMap') + + self.assertEqual(states[0].id, 1) + self.assertEqual(states[0].readers, 2) + self.assertEqual(states[0].writers, 1) + self.assertEqual(states[0].readers_waiting, 0) + self.assertEqual(states[0].writers_waiting, 0) + self.assertFalse(states[0].mutex_locked) + + self.assertEqual(states[1].id, 2) + self.assertEqual(states[1].readers, 0) + self.assertEqual(states[1].writers, 0) + self.assertEqual(states[1].readers_waiting, 1) + self.assertEqual(states[1].writers_waiting, 2) + self.assertTrue(states[1].mutex_locked) + + self.assertEqual(sum_active_from_states(states), 3) + self.assertEqual(sum_waiting_from_states(states), 3) + + def test_parse_locks_detail_malformed_values(self): sample_output = """ VSS RWLock readers = blorg writers = 1 - readers waiting = 0 + readers waiting = nope writers waiting = 0 mutex locked = 0 ExtentMap RWLock readers = 3 writers = one readers waiting = 0 - writers waiting = 0 - mutex locked = 0 + writers waiting = two + mutex locked = 1 FreeList RWLock readers = 1 writers = 0 @@ -49,26 +71,30 @@ class TestShmemLocksParsing(unittest.TestCase): writers waiting = 0 mutex locked = 0 """ - state = parse_locks_state(sample_output, logger) - self.assertEqual(state, [(3, 1, 0)]) + states = parse_locks_detail(sample_output, logger) + # Malformed numeric fields are treated as 0, sections are retained + self.assertEqual(len(states), 3) + self.assertEqual([s.name for s in states], ['VSS', 'ExtentMap', 'FreeList']) + # Lock 1: readers -> 0, writers -> 1, readers_waiting -> 0 + self.assertEqual((states[0].readers, states[0].writers, states[0].readers_waiting), (0, 1, 0)) + # Lock 2: writers -> 0, writers_waiting -> 0 due to malformed + self.assertEqual((states[1].readers, states[1].writers, states[1].writers_waiting), (3, 0, 0)) + # Lock 3 intact + self.assertEqual((states[2].readers, states[2].writers), (1, 0)) - def test_parse_locks_state_partial_section_ignored(self): + def test_parse_locks_detail_mutex_absent_defaults_false(self): sample_output = """ VSS RWLock - readers = 4 - writers = 0 - readers waiting = 0 - writers waiting = 0 - mutex locked = 0 - ExtentMap RWLock readers = 1 - readers waiting = 0 - writers waiting = 0 - mutex locked = 0 + writers = 0 + ExtentMap RWLock + readers = 0 + writers = 0 """ - state = parse_locks_state(sample_output, logger) - # Second section missing writers, so we skip it - self.assertEqual(state, [(1, 4, 0)]) + states = parse_locks_detail(sample_output, logger) + self.assertEqual(len(states), 2) + self.assertFalse(states[0].mutex_locked) + self.assertFalse(states[1].mutex_locked) if __name__ == "__main__": diff --git a/cmapi/mcs_cluster_tool/README.md b/cmapi/mcs_cluster_tool/README.md index 53a9ac8db..357e48a83 100644 --- a/cmapi/mcs_cluster_tool/README.md +++ b/cmapi/mcs_cluster_tool/README.md @@ -15,11 +15,11 @@ $ mcs [OPTIONS] COMMAND [ARGS]... **Commands**: -* `backup`: Backup Columnstore and/or MariDB data. +* `backup`: Backup Columnstore and/or MariaDB server data. * `dbrm_backup`: Columnstore DBRM Backup. -* `restore`: Restore Columnstore (and/or MariaDB) data. +* `restore`: Restore Columnstore (and/or MariaDB server) data. * `dbrm_restore`: Restore Columnstore DBRM data. -* `cskeys`: Generates a random AES encryption key and init vector and writes them to disk. +* `cskeys`: Generate a random AES encryption key and init vector and write them to disk. * `cspasswd`: Encrypt a Columnstore plaintext password. * `bootstrap-single-node`: Bootstrap a single node (localhost)... * `review`: Provides useful functions to review and troubleshoot the MCS cluster. @@ -31,11 +31,11 @@ $ mcs [OPTIONS] COMMAND [ARGS]... * `node`: Cluster nodes management. * `set`: Set cluster parameters. * `cluster`: MariaDB Columnstore cluster management... -* `cmapi`: CMAPI itself related commands. +* `cmapi`: Commands related to CMAPI itself. ## `mcs backup` -Backup Columnstore and/or MariDB data. +Backup Columnstore and/or MariaDB data. **Usage**: @@ -654,7 +654,7 @@ $ mcs cluster set log-level [OPTIONS] ## `mcs cmapi` -CMAPI itself related commands. +Commands related to CMAPI itself. **Usage**: diff --git a/cmapi/mcs_cluster_tool/__main__.py b/cmapi/mcs_cluster_tool/__main__.py index 09086d51a..83aac6eb7 100644 --- a/cmapi/mcs_cluster_tool/__main__.py +++ b/cmapi/mcs_cluster_tool/__main__.py @@ -39,7 +39,7 @@ app.command( app.command( 'cskeys', rich_help_panel='Tools commands', short_help=( - 'Generates a random AES encryption key and init vector and writes ' + 'Generate a random AES encryption key and init vector and write ' 'them to disk.' ) )(tools_commands.cskeys) diff --git a/cmapi/mcs_cluster_tool/backup_commands.py b/cmapi/mcs_cluster_tool/backup_commands.py index 1b74e6bab..bf623cb2e 100644 --- a/cmapi/mcs_cluster_tool/backup_commands.py +++ b/cmapi/mcs_cluster_tool/backup_commands.py @@ -284,7 +284,7 @@ def backup( ) ] = None, ): - """Backup Columnstore and/or MariDB data.""" + """Backup Columnstore and/or MariaDB data.""" # Local Storage Examples: # ./$0 backup -bl /tmp/backups/ -bd Local -s LocalStorage diff --git a/cmapi/mcs_cluster_tool/cmapi_app.py b/cmapi/mcs_cluster_tool/cmapi_app.py index 06bbdf750..32cdcd184 100644 --- a/cmapi/mcs_cluster_tool/cmapi_app.py +++ b/cmapi/mcs_cluster_tool/cmapi_app.py @@ -14,7 +14,7 @@ from mcs_cluster_tool.decorators import handle_output logger = logging.getLogger('mcs_cli') app = typer.Typer( - help='CMAPI itself related commands.' + help='Commands related to CMAPI itself.' )