1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-11-02 06:13:16 +03:00

Handle waiting locks

This commit is contained in:
Alexander Presnyakov
2025-09-04 16:37:32 +00:00
committed by Leonid Fedorov
parent cff5244e35
commit bd1575d34a
9 changed files with 219 additions and 146 deletions

View File

@@ -13,7 +13,7 @@ import psutil
from cmapi_server.constants import (
IFLAG, LIBJEMALLOC_DEFAULT_PATH, MCS_INSTALL_BIN, ALL_MCS_PROGS,
)
from cmapi_server.process_dispatchers import utils as dispatcher_utils
from cmapi_server.process_dispatchers.locks import release_shmem_locks
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.process_dispatchers.base import BaseDispatcher
@@ -223,7 +223,7 @@ class ContainerDispatcher(BaseDispatcher):
# Run pre-stop lock reset before saving BRM
# These stale locks can occur if the controllernode couldn't stop correctly
# and they cause mcs-savebrm.py to hang
dispatcher_utils.release_shmem_locks(logger)
locks.release_shmem_locks(logger)
# start mcs-savebrm.py before stoping workernode
logger.debug('Waiting to save BRM.')

View File

@@ -0,0 +1,151 @@
import logging
import re
from dataclasses import dataclass
from time import sleep
from typing import Optional, List
from cmapi_server.constants import SHMEM_LOCKS_PATH
from cmapi_server.process_dispatchers.base import BaseDispatcher
@dataclass
class LocksState:
id: int
name: Optional[str]
readers: int = 0
writers: int = 0
readers_waiting: int = 0
writers_waiting: int = 0
mutex_locked: bool = False
def __str__(self):
name = f"({self.name})" if self.name else ""
return (f"LS {self.id}{name}: {self.readers}r/{self.writers}w" +
f" {self.readers_waiting}rw/{self.writers_waiting}ww m={int(self.mutex_locked)}")
def parse_locks_detail(cmd_output: str, logger: logging.Logger) -> List[LocksState]:
"""Parse detailed per-lock state from mcs-shmem-locks output.
Missing or malformed numeric fields are treated as 0. A logger must be provided
and will be used to emit warnings for malformed lines.
"""
states: List[LocksState] = []
current: Optional[LocksState] = None
current_id = 0
for raw in cmd_output.splitlines():
line = raw.strip()
if not line:
continue
if line.endswith('RWLock'):
# push previous
if current is not None:
states.append(current)
current_id += 1
name = line[:-len('RWLock')].strip() or None
current = LocksState(id=current_id, name=name)
continue
if current is None:
continue
field_specs = [
(r'^readers\s*=\s*(\d+)$', 'readers'),
(r'^writers\s*=\s*(\d+)$', 'writers'),
(r'^readers waiting\s*=\s*(\d+)$', 'readers_waiting'),
(r'^writers waiting\s*=\s*(\d+)$', 'writers_waiting'),
]
matched = False
for pattern, attr in field_specs:
m = re.search(pattern, line)
if m:
try:
setattr(current, attr, int(m.group(1)))
except ValueError:
logger.warning('Failed to parse %s from line: %s', attr, raw)
setattr(current, attr, 0)
matched = True
break
if matched:
continue
m = re.search(r'^mutex locked\s*=\s*(\d+)$', line)
if m:
try:
current.mutex_locked = int(m.group(1)) != 0
except ValueError:
current.mutex_locked = False
continue
# push the last one
if current is not None:
states.append(current)
return states
def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool:
"""Attempt to release active shmem locks.
- Inspect all locks.
- Unlock writer lock (there can be only one active, but there can be multiple waiting)
- Unlock each reader lock sequentially
- Re-check and repeat up to max_iterations.
Returns True on success (no active readers/writers remain), False otherwise.
"""
# We adapt attempts/sleep when there are waiting locks to allow promotions
attempt = 0
while True:
attempt += 1
success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0')
if not success or not out:
logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt)
return False
states = parse_locks_detail(out, logger=logger)
active_total = sum(s.readers + s.writers for s in states)
waiting_total = sum(s.readers_waiting + s.writers_waiting for s in states)
if active_total == 0 and waiting_total == 0:
return True
logger.debug(
'Lock release attempt %d: active=%d waiting=%d; detailed=%s',
attempt, active_total, waiting_total, states
)
for st in states:
if st.writers > 0:
cmd = f'{SHMEM_LOCKS_PATH} -i {st.id} -w -u'
ok, _ = BaseDispatcher.exec_command(cmd)
if not ok:
logger.warning('Failed to unlock writer for lock-id=%d', st.id)
if st.readers > 0:
for _ in range(st.readers):
cmd = f'{SHMEM_LOCKS_PATH} -i {st.id} -r -u'
ok, _ = BaseDispatcher.exec_command(cmd)
if not ok:
logger.warning('Failed to unlock a reader for lock-id=%d', st.id)
break
# Wait for state to settle; longer if we have waiting locks
sleep(2 if waiting_total > 0 else 1)
# Allow more attempts when there are waiting locks
effective_max = max_iterations if waiting_total == 0 else max(max_iterations, 15)
if attempt >= effective_max:
break
logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts (active/waiting remain)', attempt)
return False
def sum_active_from_states(states: List[LocksState]) -> int:
return sum(s.readers + s.writers for s in states)
def sum_waiting_from_states(states: List[LocksState]) -> int:
return sum(s.readers_waiting + s.writers_waiting for s in states)

View File

@@ -5,7 +5,7 @@ import re
from typing import Union, Tuple
from cmapi_server.process_dispatchers.base import BaseDispatcher
from cmapi_server.process_dispatchers import utils as dispatcher_utils
from cmapi_server.process_dispatchers.locks import release_shmem_locks
class SystemdDispatcher(BaseDispatcher):
@@ -168,7 +168,7 @@ class SystemdDispatcher(BaseDispatcher):
# Run pre-stop lock reset before saving BRM
# These stale locks can occur if the controllernode couldn't stop correctly
# and they cause mcs-savebrm.py to hang
dispatcher_utils.release_shmem_locks(logging.getLogger(__name__))
release_shmem_locks(logging.getLogger(__name__))
service_name = f'{service_name}@1.service {service_name}@2.service'
cls._workernode_enable(False, use_sudo)

View File

@@ -1,104 +0,0 @@
import logging
import re
from time import sleep
from typing import Optional, List, Tuple
from cmapi_server.constants import SHMEM_LOCKS_PATH
from cmapi_server.process_dispatchers.base import BaseDispatcher
def parse_locks_state(cmd_output: str, logger: logging.Logger) -> List[Tuple[int, int, int]]:
"""Parse per-lock state from mcs-shmem-locks output.
Returns a list of tuples: (lock_id, readers, writers)
"""
locks: List[Tuple[int, int, int]] = []
current_id = 0
readers = None
writers = None
for line in cmd_output.splitlines():
if line.strip().endswith('RWLock'):
# flush previous section counts (if we have both)
if current_id > 0 and readers is not None and writers is not None:
locks.append((current_id, readers, writers))
current_id += 1
readers = None
writers = None
continue
m = re.search(r'^\s*readers\s*=\s*(\d+)', line)
if m:
try:
readers = int(m.group(1))
except ValueError:
logger.warning('Failed to parse readers count from line: %s', line)
readers = 0
continue
m = re.search(r'^\s*writers\s*=\s*(\d+)', line)
if m:
try:
writers = int(m.group(1))
except ValueError:
logger.warning('Failed to parse writers count from line: %s', line)
writers = 0
continue
# flush the last parsed lock
if current_id > 0 and readers is not None and writers is not None:
locks.append((current_id, readers, writers))
return locks
def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool:
"""Attempt to release active shmem locks.
- Inspect all locks.
- Unlock writer lock (there can be only one)
- Unlock each reader lock sequentially
- Re-check and repeat up to max_iterations.
Returns True on success (no active readers/writers remain), False otherwise.
"""
for attempt in range(1, max_iterations + 1):
success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0')
if not success or not out:
logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt)
return False
locks = parse_locks_state(out, logger=logger)
total_active = sum_active_locks(locks)
if total_active == 0:
logger.debug('Unlock attempt %d: no active locks', attempt)
return True
logger.debug('Unlock attempt %d: active total=%d; detail=%s', attempt, total_active, locks)
# Issue unlocks per lock
for lock_id, readers, writers in locks:
# Unlock writer
if writers > 0:
cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -w -u'
ok, _ = BaseDispatcher.exec_command(cmd)
if not ok:
logger.warning('Failed to unlock writer for lock-id=%d', lock_id)
# Unlock all readers
if readers > 0:
for _ in range(readers):
cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -r -u'
ok, _ = BaseDispatcher.exec_command(cmd)
if not ok:
logger.warning('Failed to unlock a reader for lock-id=%d', lock_id)
break
# Wait some time for state to settle
sleep(1)
logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts', max_iterations)
return False
def sum_active_locks(locks: List[Tuple[int, int, int]]) -> int:
return sum(r + w for _, r, w in locks)

View File

@@ -1,15 +1,17 @@
import unittest
import logging
from cmapi_server.process_dispatchers.utils import (
parse_locks_state,
sum_active_locks,
from cmapi_server.process_dispatchers.locks import (
parse_locks_detail,
sum_active_from_states,
sum_waiting_from_states,
)
logger = logging.getLogger(__name__)
class TestShmemLocksParsing(unittest.TestCase):
def test_parse_locks_state_basic(self):
def test_parse_locks_detail_basic(self):
sample_output = """
VSS RWLock
readers = 2
@@ -20,28 +22,48 @@ class TestShmemLocksParsing(unittest.TestCase):
ExtentMap RWLock
readers = 0
writers = 0
readers waiting = 0
writers waiting = 0
mutex locked = 0
readers waiting = 1
writers waiting = 2
mutex locked = 1
"""
state = parse_locks_state(sample_output, logger)
# Two sections => IDs 1 and 2
self.assertEqual(state, [(1, 2, 1), (2, 0, 0)])
states = parse_locks_detail(sample_output, logger)
self.assertEqual(len(states), 2)
def test_parse_locks_state_malformed_values(self):
# Check names
self.assertEqual(states[0].name, 'VSS')
self.assertEqual(states[1].name, 'ExtentMap')
self.assertEqual(states[0].id, 1)
self.assertEqual(states[0].readers, 2)
self.assertEqual(states[0].writers, 1)
self.assertEqual(states[0].readers_waiting, 0)
self.assertEqual(states[0].writers_waiting, 0)
self.assertFalse(states[0].mutex_locked)
self.assertEqual(states[1].id, 2)
self.assertEqual(states[1].readers, 0)
self.assertEqual(states[1].writers, 0)
self.assertEqual(states[1].readers_waiting, 1)
self.assertEqual(states[1].writers_waiting, 2)
self.assertTrue(states[1].mutex_locked)
self.assertEqual(sum_active_from_states(states), 3)
self.assertEqual(sum_waiting_from_states(states), 3)
def test_parse_locks_detail_malformed_values(self):
sample_output = """
VSS RWLock
readers = blorg
writers = 1
readers waiting = 0
readers waiting = nope
writers waiting = 0
mutex locked = 0
ExtentMap RWLock
readers = 3
writers = one
readers waiting = 0
writers waiting = 0
mutex locked = 0
writers waiting = two
mutex locked = 1
FreeList RWLock
readers = 1
writers = 0
@@ -49,26 +71,30 @@ class TestShmemLocksParsing(unittest.TestCase):
writers waiting = 0
mutex locked = 0
"""
state = parse_locks_state(sample_output, logger)
self.assertEqual(state, [(3, 1, 0)])
states = parse_locks_detail(sample_output, logger)
# Malformed numeric fields are treated as 0, sections are retained
self.assertEqual(len(states), 3)
self.assertEqual([s.name for s in states], ['VSS', 'ExtentMap', 'FreeList'])
# Lock 1: readers -> 0, writers -> 1, readers_waiting -> 0
self.assertEqual((states[0].readers, states[0].writers, states[0].readers_waiting), (0, 1, 0))
# Lock 2: writers -> 0, writers_waiting -> 0 due to malformed
self.assertEqual((states[1].readers, states[1].writers, states[1].writers_waiting), (3, 0, 0))
# Lock 3 intact
self.assertEqual((states[2].readers, states[2].writers), (1, 0))
def test_parse_locks_state_partial_section_ignored(self):
def test_parse_locks_detail_mutex_absent_defaults_false(self):
sample_output = """
VSS RWLock
readers = 4
writers = 0
readers waiting = 0
writers waiting = 0
mutex locked = 0
ExtentMap RWLock
readers = 1
readers waiting = 0
writers waiting = 0
mutex locked = 0
writers = 0
ExtentMap RWLock
readers = 0
writers = 0
"""
state = parse_locks_state(sample_output, logger)
# Second section missing writers, so we skip it
self.assertEqual(state, [(1, 4, 0)])
states = parse_locks_detail(sample_output, logger)
self.assertEqual(len(states), 2)
self.assertFalse(states[0].mutex_locked)
self.assertFalse(states[1].mutex_locked)
if __name__ == "__main__":

View File

@@ -15,11 +15,11 @@ $ mcs [OPTIONS] COMMAND [ARGS]...
**Commands**:
* `backup`: Backup Columnstore and/or MariDB data.
* `backup`: Backup Columnstore and/or MariaDB server data.
* `dbrm_backup`: Columnstore DBRM Backup.
* `restore`: Restore Columnstore (and/or MariaDB) data.
* `restore`: Restore Columnstore (and/or MariaDB server) data.
* `dbrm_restore`: Restore Columnstore DBRM data.
* `cskeys`: Generates a random AES encryption key and init vector and writes them to disk.
* `cskeys`: Generate a random AES encryption key and init vector and write them to disk.
* `cspasswd`: Encrypt a Columnstore plaintext password.
* `bootstrap-single-node`: Bootstrap a single node (localhost)...
* `review`: Provides useful functions to review and troubleshoot the MCS cluster.
@@ -31,11 +31,11 @@ $ mcs [OPTIONS] COMMAND [ARGS]...
* `node`: Cluster nodes management.
* `set`: Set cluster parameters.
* `cluster`: MariaDB Columnstore cluster management...
* `cmapi`: CMAPI itself related commands.
* `cmapi`: Commands related to CMAPI itself.
## `mcs backup`
Backup Columnstore and/or MariDB data.
Backup Columnstore and/or MariaDB data.
**Usage**:
@@ -654,7 +654,7 @@ $ mcs cluster set log-level [OPTIONS]
## `mcs cmapi`
CMAPI itself related commands.
Commands related to CMAPI itself.
**Usage**:

View File

@@ -39,7 +39,7 @@ app.command(
app.command(
'cskeys', rich_help_panel='Tools commands',
short_help=(
'Generates a random AES encryption key and init vector and writes '
'Generate a random AES encryption key and init vector and write '
'them to disk.'
)
)(tools_commands.cskeys)

View File

@@ -284,7 +284,7 @@ def backup(
)
] = None,
):
"""Backup Columnstore and/or MariDB data."""
"""Backup Columnstore and/or MariaDB data."""
# Local Storage Examples:
# ./$0 backup -bl /tmp/backups/ -bd Local -s LocalStorage

View File

@@ -14,7 +14,7 @@ from mcs_cluster_tool.decorators import handle_output
logger = logging.getLogger('mcs_cli')
app = typer.Typer(
help='CMAPI itself related commands.'
help='Commands related to CMAPI itself.'
)