From a7496ac9d0b63f1f0292834b78737299862e79fd Mon Sep 17 00:00:00 2001 From: Aleksei Antipovskii Date: Thu, 4 Sep 2025 20:48:05 +0200 Subject: [PATCH 1/8] fix(PP,THJS): MCOL-6106 Fix race condition in TupleHashJoinStep --- dbcon/joblist/diskjoinstep.cpp | 10 ++++++++-- dbcon/joblist/tuplehashjoin.cpp | 15 ++++++++------- dbcon/joblist/tuplehashjoin.h | 5 +++-- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/dbcon/joblist/diskjoinstep.cpp b/dbcon/joblist/diskjoinstep.cpp index 3bcd6945f..c978dc719 100644 --- a/dbcon/joblist/diskjoinstep.cpp +++ b/dbcon/joblist/diskjoinstep.cpp @@ -444,6 +444,12 @@ void DiskJoinStep::joinFcn(const uint32_t threadID) smallNullMem[0].reset(new uint8_t[smallNullRow.getSize()]); smallNullRow.setData(rowgroup::Row::Pointer(smallNullMem[0].get())); smallNullRow.initToNull(); + funcexp::FuncExpWrapper localFE2; + if (thjs->fe2) + { + // Make thread's own copy to prevent a possible race condition when evaluating expressions + localFE2 = *thjs->fe2; + } try { @@ -462,8 +468,8 @@ void DiskJoinStep::joinFcn(const uint32_t threadID) { l_largeRG.setData(largeData.get()); thjs->joinOneRG(0, joinResults, l_largeRG, l_outputRG, l_largeRow, l_joinFERow, l_outputRow, baseRow, - joinMatches, smallRowTemplates, outputDL.get(), &joiners, &colMappings, &fergMappings, - &smallNullMem); + joinMatches, smallRowTemplates, outputDL.get(), thjs->fe2 ? &localFE2 : nullptr, + &joiners, &colMappings, &fergMappings, &smallNullMem); if (joinResults.size()) outputResult(joinResults); diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 368a15fdd..d56f0facf 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -1576,7 +1576,7 @@ void TupleHashJoinStep::joinRunnerFcn(uint32_t threadID) joinerRunnerInputRecordsStats[threadID] += local_inputRG.getRowCount(); joinOneRG(threadID, joinedRowData, local_inputRG, local_outputRG, largeRow, joinFERow, joinedRow, - baseRow, joinMatches, smallRowTemplates, outputDL); + baseRow, joinMatches, smallRowTemplates, outputDL, fe2 ? &local_fe : nullptr); } if (fe2) @@ -1718,7 +1718,8 @@ void TupleHashJoinStep::grabSomeWork(vector* work) void TupleHashJoinStep::joinOneRG( uint32_t threadID, vector& out, RowGroup& inputRG, RowGroup& joinOutput, Row& largeSideRow, Row& joinFERow, Row& joinedRow, Row& baseRow, vector >& joinMatches, - std::shared_ptr& smallRowTemplates, RowGroupDL* outputDL, + std::shared_ptr& smallRowTemplates, RowGroupDL* lOutputDL, + FuncExpWrapper* localFE2, // disk-join support vars. This param list is insane; refactor attempt would be nice at some point. vector >* tjoiners, std::shared_ptr[]>* rgMappings, @@ -1836,7 +1837,7 @@ void TupleHashJoinStep::joinOneRG( applyMapping((*rgMappings)[smallSideCount], largeSideRow, &baseRow); baseRow.setRid(largeSideRow.getRelRid()); generateJoinResultSet(threadID, joinMatches, baseRow, *rgMappings, 0, joinOutput, joinedData, out, - smallRowTemplates, joinedRow, outputDL); + smallRowTemplates, joinedRow, lOutputDL, localFE2); } } @@ -1850,7 +1851,7 @@ void TupleHashJoinStep::generateJoinResultSet(const uint32_t threadID, const uint32_t depth, RowGroup& l_outputRG, RGData& rgData, vector& outputData, const std::shared_ptr& smallRows, Row& joinedRow, - RowGroupDL* dlp) + RowGroupDL* dlp, FuncExpWrapper* localFE2) { uint32_t i; Row& smallRow = smallRows[depth]; @@ -1863,7 +1864,7 @@ void TupleHashJoinStep::generateJoinResultSet(const uint32_t threadID, smallRow.setPointer(joinerOutput[depth][i]); applyMapping(mappings[depth], smallRow, &baseRow); generateJoinResultSet(threadID, joinerOutput, baseRow, mappings, depth + 1, l_outputRG, rgData, - outputData, smallRows, joinedRow, dlp); + outputData, smallRows, joinedRow, dlp, localFE2); } } else @@ -1887,7 +1888,7 @@ void TupleHashJoinStep::generateJoinResultSet(const uint32_t threadID, if (UNLIKELY(outputData.size() > flushThreshold || !getMemory(l_outputRG.getSizeWithStrings()))) { // MCOL-5512 - if (fe2) + if (localFE2) { RowGroup l_fe2RG; Row fe2InRow; @@ -1900,7 +1901,7 @@ void TupleHashJoinStep::generateJoinResultSet(const uint32_t threadID, // WIP do we remove previosuly pushed(line 1825) rgData // replacing it with a new FE2 rgdata added by processFE2? // Generates a new RGData w/o accounting its memory consumption - processFE2(l_outputRG, l_fe2RG, fe2InRow, fe2OutRow, &outputData, fe2.get()); + processFE2(l_outputRG, l_fe2RG, fe2InRow, fe2OutRow, &outputData, localFE2); } // Don't let the join results buffer get out of control. sendResult(outputData); diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index 4f590f2de..6630e1b46 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -545,7 +545,7 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, std::vector& outputData, const std::shared_ptr& smallRows, rowgroup::Row& joinedRow, - RowGroupDL* outputDL); + RowGroupDL* outputDL, funcexp::FuncExpWrapper* localFE2); void grabSomeWork(std::vector* work); void sendResult(const std::vector& res); void processFE2(rowgroup::RowGroup& input, rowgroup::RowGroup& output, rowgroup::Row& inRow, @@ -555,7 +555,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep rowgroup::RowGroup& joinOutput, rowgroup::Row& largeSideRow, rowgroup::Row& joinFERow, rowgroup::Row& joinedRow, rowgroup::Row& baseRow, std::vector>& joinMatches, - std::shared_ptr& smallRowTemplates, RowGroupDL* outputDL, + std::shared_ptr& smallRowTemplates, RowGroupDL* lOutputDL, + funcexp::FuncExpWrapper* localFE2, std::vector>* joiners = nullptr, std::shared_ptr[]>* rgMappings = nullptr, std::shared_ptr[]>* feMappings = nullptr, From 48e14ed5e598ad9727b37bdcfdb00e81c64376e5 Mon Sep 17 00:00:00 2001 From: Alexander Presnyakov Date: Fri, 22 Aug 2025 12:52:51 +0000 Subject: [PATCH 2/8] MCOL-6094: reset shmem locks before stopping workernode --- .../process_dispatchers/container.py | 17 +++++- oam/install_scripts/CMakeLists.txt | 2 + .../mcs-prestop-workernode.sh.in | 53 +++++++++++++++++++ oam/install_scripts/mcs-workernode.service.in | 1 + 4 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 oam/install_scripts/mcs-prestop-workernode.sh.in diff --git a/cmapi/cmapi_server/process_dispatchers/container.py b/cmapi/cmapi_server/process_dispatchers/container.py index 7db927b32..14e73b443 100644 --- a/cmapi/cmapi_server/process_dispatchers/container.py +++ b/cmapi/cmapi_server/process_dispatchers/container.py @@ -11,7 +11,7 @@ from time import sleep import psutil from cmapi_server.constants import ( - IFLAG, LIBJEMALLOC_DEFAULT_PATH, MCS_INSTALL_BIN, ALL_MCS_PROGS + IFLAG, LIBJEMALLOC_DEFAULT_PATH, MCS_INSTALL_BIN, ALL_MCS_PROGS, ) from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.process_dispatchers.base import BaseDispatcher @@ -219,6 +219,20 @@ class ContainerDispatcher(BaseDispatcher): service_proc = cls._get_proc_object(service) if service == 'workernode': + # Run pre-stop lock reset before saving BRM + # These stale locks can occur if the controllernode couldn't stop correctly + # and they cause mcs-savebrm.py to hang + + logger.debug('Pre-stop: inspecting and resetting shmem locks.') + prestop_path = os.path.join(MCS_INSTALL_BIN, 'mcs-prestop-workernode.sh') + prestop_logpath = cls._create_mcs_process_logfile( + 'mcs-prestop-workernode.log' + ) + with open(prestop_logpath, 'a', encoding='utf-8') as prestop_logfh: + _success, _ = cls.exec_command( + prestop_path, stdout=prestop_logfh + ) + # start mcs-savebrm.py before stoping workernode logger.debug('Waiting to save BRM.') savebrm_path = os.path.join(MCS_INSTALL_BIN, 'mcs-savebrm.py') @@ -289,6 +303,7 @@ class ContainerDispatcher(BaseDispatcher): ...TODO: for next releases. Additional error handling. """ + stop_success = True if cls.is_service_running(service): # TODO: retry? stop_success = cls.stop(service, is_primary, use_sudo) diff --git a/oam/install_scripts/CMakeLists.txt b/oam/install_scripts/CMakeLists.txt index 898b6fd3d..e14556135 100644 --- a/oam/install_scripts/CMakeLists.txt +++ b/oam/install_scripts/CMakeLists.txt @@ -163,6 +163,7 @@ set(SHMEM_FILE_GLOB "MCS-shm-") configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-loadbrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-loadbrm.py" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcssavebrm.py" @ONLY) +configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-prestop-workernode.sh.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-prestop-workernode.sh" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog.in" "${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog" @ONLY) columnstore_install_program(columnstore-post-install ${ENGINE_BINDIR}) @@ -174,6 +175,7 @@ columnstore_install_program(columnstoreSyslogSetup.sh ${ENGINE_BINDIR}) columnstore_install_program(mcs-stop-controllernode.sh ${ENGINE_BINDIR}) columnstore_install_program(mcs-loadbrm.py ${ENGINE_BINDIR}) columnstore_install_program(mcs-savebrm.py ${ENGINE_BINDIR}) +columnstore_install_program(mcs-prestop-workernode.sh ${ENGINE_BINDIR}) columnstore_install_program(mariadb-columnstore-start.sh ${ENGINE_BINDIR}) columnstore_install_program(mariadb-columnstore-stop.sh ${ENGINE_BINDIR}) columnstore_install_program(loop_process_starter.sh ${ENGINE_BINDIR}) diff --git a/oam/install_scripts/mcs-prestop-workernode.sh.in b/oam/install_scripts/mcs-prestop-workernode.sh.in new file mode 100644 index 000000000..a66a4d1d0 --- /dev/null +++ b/oam/install_scripts/mcs-prestop-workernode.sh.in @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +# Pre-stop helper for mcs-workernode: inspect and reset BRM shmem locks +# to avoid mcs-savebrm.py hanging on locks. +set -euo pipefail + +LOG_DIR="/var/log/mariadb/columnstore" +LOG_FILE="${LOG_DIR}/prestop-workernode.log" +BIN_DIR='@ENGINE_BINDIR@' + +SHMEM_LOCKS_BIN="${BIN_DIR}/mcs-shmem-locks" +RESET_LOCKS_BIN="${BIN_DIR}/reset_locks" + +log(){ + # $1 - level, $2... - message + local level="$1"; shift + printf "%s [%s] %b\n" "$(date -Is)" "${level}" "$*" >> "${LOG_FILE}" +} + +mkdir -p "${LOG_DIR}" 2>/dev/null || true +log INFO "Pre-stop: checking BRM shmem locks before stopping workernode." + +# This must not happen, but check that the binaries exist just in case +if [[ ! -x "${SHMEM_LOCKS_BIN}" ]]; then + log ERROR "${SHMEM_LOCKS_BIN} not found; aborting." + exit 1 +fi + +if [[ ! -x "${RESET_LOCKS_BIN}" ]]; then + log ERROR "${RESET_LOCKS_BIN} not found; aborting." + exit 1 +fi + +# Capture current lock state +OUT="$(${SHMEM_LOCKS_BIN} --lock-id 0 2>&1)" || true +log INFO "Current lock state:\n${OUT}" + +# Determine if any readers/writers are active +ACTIVE_TOTAL=$(echo "${OUT}" | awk -F'=' '/^[[:space:]]*readers =|^[[:space:]]*writers =/ {gsub(/ /, ""); print $2}' | awk '{s+=$1} END {print s+0}') +if [[ "${ACTIVE_TOTAL}" -gt 0 ]]; then + log WARN "Detected active shmem locks (sum readers+writers=${ACTIVE_TOTAL}). Attempting reset." + "${RESET_LOCKS_BIN}" -s >/dev/null 2>&1 || log ERROR "reset_locks failed to run." + sleep 1 + OUT2="$(${SHMEM_LOCKS_BIN} --lock-id 0 2>&1)" || true + log INFO "Post-reset lock state:\n${OUT2}" +else + log INFO "No active shmem locks detected." +fi + +log INFO "Pre-stop lock inspection/reset finished." + +exit 0 + + diff --git a/oam/install_scripts/mcs-workernode.service.in b/oam/install_scripts/mcs-workernode.service.in index b1cf7310f..31d2ef3e5 100644 --- a/oam/install_scripts/mcs-workernode.service.in +++ b/oam/install_scripts/mcs-workernode.service.in @@ -13,6 +13,7 @@ LimitCORE=@CORE_DUMPS@ Environment="@WORKERNODE_ALLOC_CONFIG@" ExecStart=@ENGINE_BINDIR@/workernode DBRM_Worker%i +ExecStopPre=@ENGINE_BINDIR@/mcs-prestop-workernode.sh ExecStopPost=@ENGINE_BINDIR@/mcs-savebrm.py ExecStopPost=/usr/bin/env bash -c "clearShm > /dev/null 2>&1" From c75a095a776b1e9693e365dbfcccb350bcc8f37a Mon Sep 17 00:00:00 2001 From: Alexander Presnyakov Date: Tue, 2 Sep 2025 13:15:37 +0000 Subject: [PATCH 3/8] Support older systemd versions --- oam/install_scripts/mcs-workernode.service.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oam/install_scripts/mcs-workernode.service.in b/oam/install_scripts/mcs-workernode.service.in index 31d2ef3e5..55bf459a3 100644 --- a/oam/install_scripts/mcs-workernode.service.in +++ b/oam/install_scripts/mcs-workernode.service.in @@ -13,7 +13,7 @@ LimitCORE=@CORE_DUMPS@ Environment="@WORKERNODE_ALLOC_CONFIG@" ExecStart=@ENGINE_BINDIR@/workernode DBRM_Worker%i -ExecStopPre=@ENGINE_BINDIR@/mcs-prestop-workernode.sh +ExecStop=/usr/bin/env bash -c "@ENGINE_BINDIR@/mcs-prestop-workernode.sh; /bin/kill -TERM $MAINPID" ExecStopPost=@ENGINE_BINDIR@/mcs-savebrm.py ExecStopPost=/usr/bin/env bash -c "clearShm > /dev/null 2>&1" From 384dd56a61ede775a55b0e60d577704085623568 Mon Sep 17 00:00:00 2001 From: Alexander Presnyakov Date: Wed, 3 Sep 2025 19:24:31 +0000 Subject: [PATCH 4/8] Moved locks cleanup into cmapi --- cmapi/cmapi_server/constants.py | 3 + .../process_dispatchers/container.py | 11 +--- .../process_dispatchers/systemd.py | 6 ++ .../cmapi_server/process_dispatchers/utils.py | 64 +++++++++++++++++++ oam/install_scripts/CMakeLists.txt | 2 - .../mcs-prestop-workernode.sh.in | 53 --------------- oam/install_scripts/mcs-workernode.service.in | 1 - 7 files changed, 75 insertions(+), 65 deletions(-) create mode 100644 cmapi/cmapi_server/process_dispatchers/utils.py delete mode 100644 oam/install_scripts/mcs-prestop-workernode.sh.in diff --git a/cmapi/cmapi_server/constants.py b/cmapi/cmapi_server/constants.py index c4bbb8cd6..13da47f07 100644 --- a/cmapi/cmapi_server/constants.py +++ b/cmapi/cmapi_server/constants.py @@ -92,6 +92,9 @@ IFLAG = os.path.join(MCS_ETC_PATH, 'container-initialized') LIBJEMALLOC_DEFAULT_PATH = os.path.join(MCS_DATA_PATH, 'libjemalloc.so.2') MCS_LOG_PATH = '/var/log/mariadb/columnstore' +# tools for BRM shmem lock inspection/reset +SHMEM_LOCKS_PATH = os.path.join(MCS_INSTALL_BIN, 'mcs-shmem-locks') +RESET_LOCKS_PATH = os.path.join(MCS_INSTALL_BIN, 'reset_locks') # client constants CMAPI_PORT = 8640 #TODO: use it in all places diff --git a/cmapi/cmapi_server/process_dispatchers/container.py b/cmapi/cmapi_server/process_dispatchers/container.py index 14e73b443..233373bb7 100644 --- a/cmapi/cmapi_server/process_dispatchers/container.py +++ b/cmapi/cmapi_server/process_dispatchers/container.py @@ -13,6 +13,7 @@ import psutil from cmapi_server.constants import ( IFLAG, LIBJEMALLOC_DEFAULT_PATH, MCS_INSTALL_BIN, ALL_MCS_PROGS, ) +from cmapi_server.process_dispatchers import utils as dispatcher_utils from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.process_dispatchers.base import BaseDispatcher @@ -223,15 +224,7 @@ class ContainerDispatcher(BaseDispatcher): # These stale locks can occur if the controllernode couldn't stop correctly # and they cause mcs-savebrm.py to hang - logger.debug('Pre-stop: inspecting and resetting shmem locks.') - prestop_path = os.path.join(MCS_INSTALL_BIN, 'mcs-prestop-workernode.sh') - prestop_logpath = cls._create_mcs_process_logfile( - 'mcs-prestop-workernode.log' - ) - with open(prestop_logpath, 'a', encoding='utf-8') as prestop_logfh: - _success, _ = cls.exec_command( - prestop_path, stdout=prestop_logfh - ) + dispatcher_utils.reset_shmem_locks(logger) # start mcs-savebrm.py before stoping workernode logger.debug('Waiting to save BRM.') diff --git a/cmapi/cmapi_server/process_dispatchers/systemd.py b/cmapi/cmapi_server/process_dispatchers/systemd.py index 52433349c..ccbc277ef 100644 --- a/cmapi/cmapi_server/process_dispatchers/systemd.py +++ b/cmapi/cmapi_server/process_dispatchers/systemd.py @@ -5,6 +5,7 @@ import re from typing import Union, Tuple from cmapi_server.process_dispatchers.base import BaseDispatcher +from cmapi_server.process_dispatchers import utils as dispatcher_utils class SystemdDispatcher(BaseDispatcher): @@ -164,6 +165,11 @@ class SystemdDispatcher(BaseDispatcher): """ service_name = service if service_name == 'mcs-workernode': + # Run pre-stop lock reset before saving BRM + # These stale locks can occur if the controllernode couldn't stop correctly + # and they cause mcs-savebrm.py to hang + dispatcher_utils.reset_shmem_locks(logging.getLogger(__name__)) + service_name = f'{service_name}@1.service {service_name}@2.service' cls._workernode_enable(False, use_sudo) diff --git a/cmapi/cmapi_server/process_dispatchers/utils.py b/cmapi/cmapi_server/process_dispatchers/utils.py new file mode 100644 index 000000000..05cb1af8a --- /dev/null +++ b/cmapi/cmapi_server/process_dispatchers/utils.py @@ -0,0 +1,64 @@ +import logging +import re +from time import sleep +from typing import Optional +from cmapi_server.constants import SHMEM_LOCKS_PATH, RESET_LOCKS_PATH +from cmapi_server.process_dispatchers.base import BaseDispatcher + + +def parse_locks_num(cmd_output: str) -> int: + """Parse output of mcs-shmem-locks command.""" + active_total = 0 + for line in cmd_output.splitlines(): + m = re.search(r'^\s*(readers|writers)\s*=\s*(\d+)', line) + if m: + try: + active_total += int(m.group(2)) + except ValueError: + pass + return active_total + + +def get_active_shmem_locks_num(logger: logging.Logger) -> Optional[int]: + """Get number of active shmem locks.""" + cmd = f'{SHMEM_LOCKS_PATH} --lock-id 0' + success, out = BaseDispatcher.exec_command(cmd) + if not success: + logger.error('Failed to inspect shmem locks (command failed)') + return None + if not out: + logger.error('Failed to inspect shmem locks (empty output)') + return None + + logger.debug('Current lock state:\n%s', (out or '').strip()) + + return parse_locks_num(out) + + +def reset_shmem_locks(logger: logging.Logger) -> None: + """Inspect and reset BRM shmem locks""" + logger.debug('Inspecting and resetting shmem locks.') + + # Get current lock state + active_locks_num = get_active_shmem_locks_num(logger) + if active_locks_num is None: + return + + # Reset if any read/write locks are active + if active_locks_num > 0: + logger.info('Detected active shmem locks (sum readers+writers=%d). Attempting reset.', active_locks_num) + + # Reset locks + success, out = BaseDispatcher.exec_command(f'{RESET_LOCKS_PATH} -s') + if not success: + logger.error('Failed to reset shmem locks (command failed)') + return + + # Check that locks were reset + sleep(1) + active_locks_num = get_active_shmem_locks_num(logger) + if active_locks_num is not None and active_locks_num > 0: + logger.error('Failed to reset shmem locks (locks are still active)') + return + else: + logger.info('No active shmem locks detected.') diff --git a/oam/install_scripts/CMakeLists.txt b/oam/install_scripts/CMakeLists.txt index e14556135..898b6fd3d 100644 --- a/oam/install_scripts/CMakeLists.txt +++ b/oam/install_scripts/CMakeLists.txt @@ -163,7 +163,6 @@ set(SHMEM_FILE_GLOB "MCS-shm-") configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-loadbrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-loadbrm.py" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcssavebrm.py" @ONLY) -configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-prestop-workernode.sh.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-prestop-workernode.sh" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog.in" "${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog" @ONLY) columnstore_install_program(columnstore-post-install ${ENGINE_BINDIR}) @@ -175,7 +174,6 @@ columnstore_install_program(columnstoreSyslogSetup.sh ${ENGINE_BINDIR}) columnstore_install_program(mcs-stop-controllernode.sh ${ENGINE_BINDIR}) columnstore_install_program(mcs-loadbrm.py ${ENGINE_BINDIR}) columnstore_install_program(mcs-savebrm.py ${ENGINE_BINDIR}) -columnstore_install_program(mcs-prestop-workernode.sh ${ENGINE_BINDIR}) columnstore_install_program(mariadb-columnstore-start.sh ${ENGINE_BINDIR}) columnstore_install_program(mariadb-columnstore-stop.sh ${ENGINE_BINDIR}) columnstore_install_program(loop_process_starter.sh ${ENGINE_BINDIR}) diff --git a/oam/install_scripts/mcs-prestop-workernode.sh.in b/oam/install_scripts/mcs-prestop-workernode.sh.in deleted file mode 100644 index a66a4d1d0..000000000 --- a/oam/install_scripts/mcs-prestop-workernode.sh.in +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env bash -# Pre-stop helper for mcs-workernode: inspect and reset BRM shmem locks -# to avoid mcs-savebrm.py hanging on locks. -set -euo pipefail - -LOG_DIR="/var/log/mariadb/columnstore" -LOG_FILE="${LOG_DIR}/prestop-workernode.log" -BIN_DIR='@ENGINE_BINDIR@' - -SHMEM_LOCKS_BIN="${BIN_DIR}/mcs-shmem-locks" -RESET_LOCKS_BIN="${BIN_DIR}/reset_locks" - -log(){ - # $1 - level, $2... - message - local level="$1"; shift - printf "%s [%s] %b\n" "$(date -Is)" "${level}" "$*" >> "${LOG_FILE}" -} - -mkdir -p "${LOG_DIR}" 2>/dev/null || true -log INFO "Pre-stop: checking BRM shmem locks before stopping workernode." - -# This must not happen, but check that the binaries exist just in case -if [[ ! -x "${SHMEM_LOCKS_BIN}" ]]; then - log ERROR "${SHMEM_LOCKS_BIN} not found; aborting." - exit 1 -fi - -if [[ ! -x "${RESET_LOCKS_BIN}" ]]; then - log ERROR "${RESET_LOCKS_BIN} not found; aborting." - exit 1 -fi - -# Capture current lock state -OUT="$(${SHMEM_LOCKS_BIN} --lock-id 0 2>&1)" || true -log INFO "Current lock state:\n${OUT}" - -# Determine if any readers/writers are active -ACTIVE_TOTAL=$(echo "${OUT}" | awk -F'=' '/^[[:space:]]*readers =|^[[:space:]]*writers =/ {gsub(/ /, ""); print $2}' | awk '{s+=$1} END {print s+0}') -if [[ "${ACTIVE_TOTAL}" -gt 0 ]]; then - log WARN "Detected active shmem locks (sum readers+writers=${ACTIVE_TOTAL}). Attempting reset." - "${RESET_LOCKS_BIN}" -s >/dev/null 2>&1 || log ERROR "reset_locks failed to run." - sleep 1 - OUT2="$(${SHMEM_LOCKS_BIN} --lock-id 0 2>&1)" || true - log INFO "Post-reset lock state:\n${OUT2}" -else - log INFO "No active shmem locks detected." -fi - -log INFO "Pre-stop lock inspection/reset finished." - -exit 0 - - diff --git a/oam/install_scripts/mcs-workernode.service.in b/oam/install_scripts/mcs-workernode.service.in index 55bf459a3..b1cf7310f 100644 --- a/oam/install_scripts/mcs-workernode.service.in +++ b/oam/install_scripts/mcs-workernode.service.in @@ -13,7 +13,6 @@ LimitCORE=@CORE_DUMPS@ Environment="@WORKERNODE_ALLOC_CONFIG@" ExecStart=@ENGINE_BINDIR@/workernode DBRM_Worker%i -ExecStop=/usr/bin/env bash -c "@ENGINE_BINDIR@/mcs-prestop-workernode.sh; /bin/kill -TERM $MAINPID" ExecStopPost=@ENGINE_BINDIR@/mcs-savebrm.py ExecStopPost=/usr/bin/env bash -c "clearShm > /dev/null 2>&1" From cff5244e3529656aab1c692057f1dfe07162fd63 Mon Sep 17 00:00:00 2001 From: Alexander Presnyakov Date: Thu, 4 Sep 2025 14:43:21 +0000 Subject: [PATCH 5/8] Use the same mcs-shmem-locks tool that we used to inspect locks state to unlock them --- cmapi/cmapi_server/constants.py | 3 +- .../process_dispatchers/container.py | 3 +- .../process_dispatchers/systemd.py | 2 +- .../cmapi_server/process_dispatchers/utils.py | 128 ++++++++++++------ .../test/test_shmem_locks_parsing.py | 75 ++++++++++ 5 files changed, 162 insertions(+), 49 deletions(-) create mode 100644 cmapi/cmapi_server/test/test_shmem_locks_parsing.py diff --git a/cmapi/cmapi_server/constants.py b/cmapi/cmapi_server/constants.py index 13da47f07..4f9cbe9c8 100644 --- a/cmapi/cmapi_server/constants.py +++ b/cmapi/cmapi_server/constants.py @@ -92,9 +92,8 @@ IFLAG = os.path.join(MCS_ETC_PATH, 'container-initialized') LIBJEMALLOC_DEFAULT_PATH = os.path.join(MCS_DATA_PATH, 'libjemalloc.so.2') MCS_LOG_PATH = '/var/log/mariadb/columnstore' -# tools for BRM shmem lock inspection/reset +# BRM shmem lock inspection/reset tool SHMEM_LOCKS_PATH = os.path.join(MCS_INSTALL_BIN, 'mcs-shmem-locks') -RESET_LOCKS_PATH = os.path.join(MCS_INSTALL_BIN, 'reset_locks') # client constants CMAPI_PORT = 8640 #TODO: use it in all places diff --git a/cmapi/cmapi_server/process_dispatchers/container.py b/cmapi/cmapi_server/process_dispatchers/container.py index 233373bb7..c3204ad4e 100644 --- a/cmapi/cmapi_server/process_dispatchers/container.py +++ b/cmapi/cmapi_server/process_dispatchers/container.py @@ -223,8 +223,7 @@ class ContainerDispatcher(BaseDispatcher): # Run pre-stop lock reset before saving BRM # These stale locks can occur if the controllernode couldn't stop correctly # and they cause mcs-savebrm.py to hang - - dispatcher_utils.reset_shmem_locks(logger) + dispatcher_utils.release_shmem_locks(logger) # start mcs-savebrm.py before stoping workernode logger.debug('Waiting to save BRM.') diff --git a/cmapi/cmapi_server/process_dispatchers/systemd.py b/cmapi/cmapi_server/process_dispatchers/systemd.py index ccbc277ef..b83a55e23 100644 --- a/cmapi/cmapi_server/process_dispatchers/systemd.py +++ b/cmapi/cmapi_server/process_dispatchers/systemd.py @@ -168,7 +168,7 @@ class SystemdDispatcher(BaseDispatcher): # Run pre-stop lock reset before saving BRM # These stale locks can occur if the controllernode couldn't stop correctly # and they cause mcs-savebrm.py to hang - dispatcher_utils.reset_shmem_locks(logging.getLogger(__name__)) + dispatcher_utils.release_shmem_locks(logging.getLogger(__name__)) service_name = f'{service_name}@1.service {service_name}@2.service' cls._workernode_enable(False, use_sudo) diff --git a/cmapi/cmapi_server/process_dispatchers/utils.py b/cmapi/cmapi_server/process_dispatchers/utils.py index 05cb1af8a..45b14461a 100644 --- a/cmapi/cmapi_server/process_dispatchers/utils.py +++ b/cmapi/cmapi_server/process_dispatchers/utils.py @@ -1,64 +1,104 @@ import logging import re from time import sleep -from typing import Optional -from cmapi_server.constants import SHMEM_LOCKS_PATH, RESET_LOCKS_PATH +from typing import Optional, List, Tuple +from cmapi_server.constants import SHMEM_LOCKS_PATH from cmapi_server.process_dispatchers.base import BaseDispatcher -def parse_locks_num(cmd_output: str) -> int: - """Parse output of mcs-shmem-locks command.""" - active_total = 0 +def parse_locks_state(cmd_output: str, logger: logging.Logger) -> List[Tuple[int, int, int]]: + """Parse per-lock state from mcs-shmem-locks output. + + Returns a list of tuples: (lock_id, readers, writers) + """ + locks: List[Tuple[int, int, int]] = [] + current_id = 0 + readers = None + writers = None + for line in cmd_output.splitlines(): - m = re.search(r'^\s*(readers|writers)\s*=\s*(\d+)', line) + if line.strip().endswith('RWLock'): + # flush previous section counts (if we have both) + if current_id > 0 and readers is not None and writers is not None: + locks.append((current_id, readers, writers)) + current_id += 1 + readers = None + writers = None + continue + + m = re.search(r'^\s*readers\s*=\s*(\d+)', line) if m: try: - active_total += int(m.group(2)) + readers = int(m.group(1)) except ValueError: - pass - return active_total + logger.warning('Failed to parse readers count from line: %s', line) + readers = 0 + continue + + m = re.search(r'^\s*writers\s*=\s*(\d+)', line) + if m: + try: + writers = int(m.group(1)) + except ValueError: + logger.warning('Failed to parse writers count from line: %s', line) + writers = 0 + continue + + # flush the last parsed lock + if current_id > 0 and readers is not None and writers is not None: + locks.append((current_id, readers, writers)) + + return locks -def get_active_shmem_locks_num(logger: logging.Logger) -> Optional[int]: - """Get number of active shmem locks.""" - cmd = f'{SHMEM_LOCKS_PATH} --lock-id 0' - success, out = BaseDispatcher.exec_command(cmd) - if not success: - logger.error('Failed to inspect shmem locks (command failed)') - return None - if not out: - logger.error('Failed to inspect shmem locks (empty output)') - return None +def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool: + """Attempt to release active shmem locks. - logger.debug('Current lock state:\n%s', (out or '').strip()) + - Inspect all locks. + - Unlock writer lock (there can be only one) + - Unlock each reader lock sequentially + - Re-check and repeat up to max_iterations. - return parse_locks_num(out) + Returns True on success (no active readers/writers remain), False otherwise. + """ + for attempt in range(1, max_iterations + 1): + success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0') + if not success or not out: + logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt) + return False + locks = parse_locks_state(out, logger=logger) -def reset_shmem_locks(logger: logging.Logger) -> None: - """Inspect and reset BRM shmem locks""" - logger.debug('Inspecting and resetting shmem locks.') + total_active = sum_active_locks(locks) + if total_active == 0: + logger.debug('Unlock attempt %d: no active locks', attempt) + return True + logger.debug('Unlock attempt %d: active total=%d; detail=%s', attempt, total_active, locks) - # Get current lock state - active_locks_num = get_active_shmem_locks_num(logger) - if active_locks_num is None: - return + # Issue unlocks per lock + for lock_id, readers, writers in locks: + # Unlock writer + if writers > 0: + cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -w -u' + ok, _ = BaseDispatcher.exec_command(cmd) + if not ok: + logger.warning('Failed to unlock writer for lock-id=%d', lock_id) - # Reset if any read/write locks are active - if active_locks_num > 0: - logger.info('Detected active shmem locks (sum readers+writers=%d). Attempting reset.', active_locks_num) + # Unlock all readers + if readers > 0: + for _ in range(readers): + cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -r -u' + ok, _ = BaseDispatcher.exec_command(cmd) + if not ok: + logger.warning('Failed to unlock a reader for lock-id=%d', lock_id) + break - # Reset locks - success, out = BaseDispatcher.exec_command(f'{RESET_LOCKS_PATH} -s') - if not success: - logger.error('Failed to reset shmem locks (command failed)') - return - - # Check that locks were reset + # Wait some time for state to settle sleep(1) - active_locks_num = get_active_shmem_locks_num(logger) - if active_locks_num is not None and active_locks_num > 0: - logger.error('Failed to reset shmem locks (locks are still active)') - return - else: - logger.info('No active shmem locks detected.') + + logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts', max_iterations) + return False + + +def sum_active_locks(locks: List[Tuple[int, int, int]]) -> int: + return sum(r + w for _, r, w in locks) diff --git a/cmapi/cmapi_server/test/test_shmem_locks_parsing.py b/cmapi/cmapi_server/test/test_shmem_locks_parsing.py new file mode 100644 index 000000000..9dcc86866 --- /dev/null +++ b/cmapi/cmapi_server/test/test_shmem_locks_parsing.py @@ -0,0 +1,75 @@ +import unittest +import logging + +from cmapi_server.process_dispatchers.utils import ( + parse_locks_state, + sum_active_locks, +) + +logger = logging.getLogger(__name__) + +class TestShmemLocksParsing(unittest.TestCase): + def test_parse_locks_state_basic(self): + sample_output = """ + VSS RWLock + readers = 2 + writers = 1 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + ExtentMap RWLock + readers = 0 + writers = 0 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + """ + state = parse_locks_state(sample_output, logger) + # Two sections => IDs 1 and 2 + self.assertEqual(state, [(1, 2, 1), (2, 0, 0)]) + + def test_parse_locks_state_malformed_values(self): + sample_output = """ + VSS RWLock + readers = blorg + writers = 1 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + ExtentMap RWLock + readers = 3 + writers = one + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + FreeList RWLock + readers = 1 + writers = 0 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + """ + state = parse_locks_state(sample_output, logger) + self.assertEqual(state, [(3, 1, 0)]) + + def test_parse_locks_state_partial_section_ignored(self): + sample_output = """ + VSS RWLock + readers = 4 + writers = 0 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + ExtentMap RWLock + readers = 1 + readers waiting = 0 + writers waiting = 0 + mutex locked = 0 + """ + state = parse_locks_state(sample_output, logger) + # Second section missing writers, so we skip it + self.assertEqual(state, [(1, 4, 0)]) + + +if __name__ == "__main__": + unittest.main() From bd1575d34ac85ad9c76ebf60ca8a5cb35c4291bc Mon Sep 17 00:00:00 2001 From: Alexander Presnyakov Date: Thu, 4 Sep 2025 16:37:32 +0000 Subject: [PATCH 6/8] Handle waiting locks --- .../process_dispatchers/container.py | 4 +- .../cmapi_server/process_dispatchers/locks.py | 151 ++++++++++++++++++ .../process_dispatchers/systemd.py | 4 +- .../cmapi_server/process_dispatchers/utils.py | 104 ------------ .../test/test_shmem_locks_parsing.py | 84 ++++++---- cmapi/mcs_cluster_tool/README.md | 12 +- cmapi/mcs_cluster_tool/__main__.py | 2 +- cmapi/mcs_cluster_tool/backup_commands.py | 2 +- cmapi/mcs_cluster_tool/cmapi_app.py | 2 +- 9 files changed, 219 insertions(+), 146 deletions(-) create mode 100644 cmapi/cmapi_server/process_dispatchers/locks.py delete mode 100644 cmapi/cmapi_server/process_dispatchers/utils.py diff --git a/cmapi/cmapi_server/process_dispatchers/container.py b/cmapi/cmapi_server/process_dispatchers/container.py index c3204ad4e..62b6ea53c 100644 --- a/cmapi/cmapi_server/process_dispatchers/container.py +++ b/cmapi/cmapi_server/process_dispatchers/container.py @@ -13,7 +13,7 @@ import psutil from cmapi_server.constants import ( IFLAG, LIBJEMALLOC_DEFAULT_PATH, MCS_INSTALL_BIN, ALL_MCS_PROGS, ) -from cmapi_server.process_dispatchers import utils as dispatcher_utils +from cmapi_server.process_dispatchers.locks import release_shmem_locks from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.process_dispatchers.base import BaseDispatcher @@ -223,7 +223,7 @@ class ContainerDispatcher(BaseDispatcher): # Run pre-stop lock reset before saving BRM # These stale locks can occur if the controllernode couldn't stop correctly # and they cause mcs-savebrm.py to hang - dispatcher_utils.release_shmem_locks(logger) + locks.release_shmem_locks(logger) # start mcs-savebrm.py before stoping workernode logger.debug('Waiting to save BRM.') diff --git a/cmapi/cmapi_server/process_dispatchers/locks.py b/cmapi/cmapi_server/process_dispatchers/locks.py new file mode 100644 index 000000000..d017a39d6 --- /dev/null +++ b/cmapi/cmapi_server/process_dispatchers/locks.py @@ -0,0 +1,151 @@ +import logging +import re +from dataclasses import dataclass +from time import sleep +from typing import Optional, List +from cmapi_server.constants import SHMEM_LOCKS_PATH +from cmapi_server.process_dispatchers.base import BaseDispatcher + + +@dataclass +class LocksState: + id: int + name: Optional[str] + readers: int = 0 + writers: int = 0 + readers_waiting: int = 0 + writers_waiting: int = 0 + mutex_locked: bool = False + + def __str__(self): + name = f"({self.name})" if self.name else "" + return (f"LS {self.id}{name}: {self.readers}r/{self.writers}w" + + f" {self.readers_waiting}rw/{self.writers_waiting}ww m={int(self.mutex_locked)}") + + +def parse_locks_detail(cmd_output: str, logger: logging.Logger) -> List[LocksState]: + """Parse detailed per-lock state from mcs-shmem-locks output. + + Missing or malformed numeric fields are treated as 0. A logger must be provided + and will be used to emit warnings for malformed lines. + """ + states: List[LocksState] = [] + current: Optional[LocksState] = None + current_id = 0 + + for raw in cmd_output.splitlines(): + line = raw.strip() + if not line: + continue + + if line.endswith('RWLock'): + # push previous + if current is not None: + states.append(current) + current_id += 1 + name = line[:-len('RWLock')].strip() or None + current = LocksState(id=current_id, name=name) + continue + + if current is None: + continue + + field_specs = [ + (r'^readers\s*=\s*(\d+)$', 'readers'), + (r'^writers\s*=\s*(\d+)$', 'writers'), + (r'^readers waiting\s*=\s*(\d+)$', 'readers_waiting'), + (r'^writers waiting\s*=\s*(\d+)$', 'writers_waiting'), + ] + + matched = False + for pattern, attr in field_specs: + m = re.search(pattern, line) + if m: + try: + setattr(current, attr, int(m.group(1))) + except ValueError: + logger.warning('Failed to parse %s from line: %s', attr, raw) + setattr(current, attr, 0) + matched = True + break + if matched: + continue + + m = re.search(r'^mutex locked\s*=\s*(\d+)$', line) + if m: + try: + current.mutex_locked = int(m.group(1)) != 0 + except ValueError: + current.mutex_locked = False + continue + + # push the last one + if current is not None: + states.append(current) + + return states + + +def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool: + """Attempt to release active shmem locks. + + - Inspect all locks. + - Unlock writer lock (there can be only one active, but there can be multiple waiting) + - Unlock each reader lock sequentially + - Re-check and repeat up to max_iterations. + + Returns True on success (no active readers/writers remain), False otherwise. + """ + # We adapt attempts/sleep when there are waiting locks to allow promotions + attempt = 0 + while True: + attempt += 1 + success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0') + if not success or not out: + logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt) + return False + + states = parse_locks_detail(out, logger=logger) + active_total = sum(s.readers + s.writers for s in states) + waiting_total = sum(s.readers_waiting + s.writers_waiting for s in states) + if active_total == 0 and waiting_total == 0: + return True + + logger.debug( + 'Lock release attempt %d: active=%d waiting=%d; detailed=%s', + attempt, active_total, waiting_total, states + ) + + for st in states: + if st.writers > 0: + cmd = f'{SHMEM_LOCKS_PATH} -i {st.id} -w -u' + ok, _ = BaseDispatcher.exec_command(cmd) + if not ok: + logger.warning('Failed to unlock writer for lock-id=%d', st.id) + + if st.readers > 0: + for _ in range(st.readers): + cmd = f'{SHMEM_LOCKS_PATH} -i {st.id} -r -u' + ok, _ = BaseDispatcher.exec_command(cmd) + if not ok: + logger.warning('Failed to unlock a reader for lock-id=%d', st.id) + break + + # Wait for state to settle; longer if we have waiting locks + sleep(2 if waiting_total > 0 else 1) + + # Allow more attempts when there are waiting locks + effective_max = max_iterations if waiting_total == 0 else max(max_iterations, 15) + if attempt >= effective_max: + break + + logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts (active/waiting remain)', attempt) + return False + + +def sum_active_from_states(states: List[LocksState]) -> int: + return sum(s.readers + s.writers for s in states) + + +def sum_waiting_from_states(states: List[LocksState]) -> int: + return sum(s.readers_waiting + s.writers_waiting for s in states) diff --git a/cmapi/cmapi_server/process_dispatchers/systemd.py b/cmapi/cmapi_server/process_dispatchers/systemd.py index b83a55e23..8f5184e58 100644 --- a/cmapi/cmapi_server/process_dispatchers/systemd.py +++ b/cmapi/cmapi_server/process_dispatchers/systemd.py @@ -5,7 +5,7 @@ import re from typing import Union, Tuple from cmapi_server.process_dispatchers.base import BaseDispatcher -from cmapi_server.process_dispatchers import utils as dispatcher_utils +from cmapi_server.process_dispatchers.locks import release_shmem_locks class SystemdDispatcher(BaseDispatcher): @@ -168,7 +168,7 @@ class SystemdDispatcher(BaseDispatcher): # Run pre-stop lock reset before saving BRM # These stale locks can occur if the controllernode couldn't stop correctly # and they cause mcs-savebrm.py to hang - dispatcher_utils.release_shmem_locks(logging.getLogger(__name__)) + release_shmem_locks(logging.getLogger(__name__)) service_name = f'{service_name}@1.service {service_name}@2.service' cls._workernode_enable(False, use_sudo) diff --git a/cmapi/cmapi_server/process_dispatchers/utils.py b/cmapi/cmapi_server/process_dispatchers/utils.py deleted file mode 100644 index 45b14461a..000000000 --- a/cmapi/cmapi_server/process_dispatchers/utils.py +++ /dev/null @@ -1,104 +0,0 @@ -import logging -import re -from time import sleep -from typing import Optional, List, Tuple -from cmapi_server.constants import SHMEM_LOCKS_PATH -from cmapi_server.process_dispatchers.base import BaseDispatcher - - -def parse_locks_state(cmd_output: str, logger: logging.Logger) -> List[Tuple[int, int, int]]: - """Parse per-lock state from mcs-shmem-locks output. - - Returns a list of tuples: (lock_id, readers, writers) - """ - locks: List[Tuple[int, int, int]] = [] - current_id = 0 - readers = None - writers = None - - for line in cmd_output.splitlines(): - if line.strip().endswith('RWLock'): - # flush previous section counts (if we have both) - if current_id > 0 and readers is not None and writers is not None: - locks.append((current_id, readers, writers)) - current_id += 1 - readers = None - writers = None - continue - - m = re.search(r'^\s*readers\s*=\s*(\d+)', line) - if m: - try: - readers = int(m.group(1)) - except ValueError: - logger.warning('Failed to parse readers count from line: %s', line) - readers = 0 - continue - - m = re.search(r'^\s*writers\s*=\s*(\d+)', line) - if m: - try: - writers = int(m.group(1)) - except ValueError: - logger.warning('Failed to parse writers count from line: %s', line) - writers = 0 - continue - - # flush the last parsed lock - if current_id > 0 and readers is not None and writers is not None: - locks.append((current_id, readers, writers)) - - return locks - - -def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool: - """Attempt to release active shmem locks. - - - Inspect all locks. - - Unlock writer lock (there can be only one) - - Unlock each reader lock sequentially - - Re-check and repeat up to max_iterations. - - Returns True on success (no active readers/writers remain), False otherwise. - """ - for attempt in range(1, max_iterations + 1): - success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0') - if not success or not out: - logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt) - return False - - locks = parse_locks_state(out, logger=logger) - - total_active = sum_active_locks(locks) - if total_active == 0: - logger.debug('Unlock attempt %d: no active locks', attempt) - return True - logger.debug('Unlock attempt %d: active total=%d; detail=%s', attempt, total_active, locks) - - # Issue unlocks per lock - for lock_id, readers, writers in locks: - # Unlock writer - if writers > 0: - cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -w -u' - ok, _ = BaseDispatcher.exec_command(cmd) - if not ok: - logger.warning('Failed to unlock writer for lock-id=%d', lock_id) - - # Unlock all readers - if readers > 0: - for _ in range(readers): - cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -r -u' - ok, _ = BaseDispatcher.exec_command(cmd) - if not ok: - logger.warning('Failed to unlock a reader for lock-id=%d', lock_id) - break - - # Wait some time for state to settle - sleep(1) - - logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts', max_iterations) - return False - - -def sum_active_locks(locks: List[Tuple[int, int, int]]) -> int: - return sum(r + w for _, r, w in locks) diff --git a/cmapi/cmapi_server/test/test_shmem_locks_parsing.py b/cmapi/cmapi_server/test/test_shmem_locks_parsing.py index 9dcc86866..64937deb5 100644 --- a/cmapi/cmapi_server/test/test_shmem_locks_parsing.py +++ b/cmapi/cmapi_server/test/test_shmem_locks_parsing.py @@ -1,15 +1,17 @@ import unittest import logging -from cmapi_server.process_dispatchers.utils import ( - parse_locks_state, - sum_active_locks, +from cmapi_server.process_dispatchers.locks import ( + parse_locks_detail, + sum_active_from_states, + sum_waiting_from_states, ) logger = logging.getLogger(__name__) + class TestShmemLocksParsing(unittest.TestCase): - def test_parse_locks_state_basic(self): + def test_parse_locks_detail_basic(self): sample_output = """ VSS RWLock readers = 2 @@ -20,28 +22,48 @@ class TestShmemLocksParsing(unittest.TestCase): ExtentMap RWLock readers = 0 writers = 0 - readers waiting = 0 - writers waiting = 0 - mutex locked = 0 + readers waiting = 1 + writers waiting = 2 + mutex locked = 1 """ - state = parse_locks_state(sample_output, logger) - # Two sections => IDs 1 and 2 - self.assertEqual(state, [(1, 2, 1), (2, 0, 0)]) + states = parse_locks_detail(sample_output, logger) + self.assertEqual(len(states), 2) - def test_parse_locks_state_malformed_values(self): + # Check names + self.assertEqual(states[0].name, 'VSS') + self.assertEqual(states[1].name, 'ExtentMap') + + self.assertEqual(states[0].id, 1) + self.assertEqual(states[0].readers, 2) + self.assertEqual(states[0].writers, 1) + self.assertEqual(states[0].readers_waiting, 0) + self.assertEqual(states[0].writers_waiting, 0) + self.assertFalse(states[0].mutex_locked) + + self.assertEqual(states[1].id, 2) + self.assertEqual(states[1].readers, 0) + self.assertEqual(states[1].writers, 0) + self.assertEqual(states[1].readers_waiting, 1) + self.assertEqual(states[1].writers_waiting, 2) + self.assertTrue(states[1].mutex_locked) + + self.assertEqual(sum_active_from_states(states), 3) + self.assertEqual(sum_waiting_from_states(states), 3) + + def test_parse_locks_detail_malformed_values(self): sample_output = """ VSS RWLock readers = blorg writers = 1 - readers waiting = 0 + readers waiting = nope writers waiting = 0 mutex locked = 0 ExtentMap RWLock readers = 3 writers = one readers waiting = 0 - writers waiting = 0 - mutex locked = 0 + writers waiting = two + mutex locked = 1 FreeList RWLock readers = 1 writers = 0 @@ -49,26 +71,30 @@ class TestShmemLocksParsing(unittest.TestCase): writers waiting = 0 mutex locked = 0 """ - state = parse_locks_state(sample_output, logger) - self.assertEqual(state, [(3, 1, 0)]) + states = parse_locks_detail(sample_output, logger) + # Malformed numeric fields are treated as 0, sections are retained + self.assertEqual(len(states), 3) + self.assertEqual([s.name for s in states], ['VSS', 'ExtentMap', 'FreeList']) + # Lock 1: readers -> 0, writers -> 1, readers_waiting -> 0 + self.assertEqual((states[0].readers, states[0].writers, states[0].readers_waiting), (0, 1, 0)) + # Lock 2: writers -> 0, writers_waiting -> 0 due to malformed + self.assertEqual((states[1].readers, states[1].writers, states[1].writers_waiting), (3, 0, 0)) + # Lock 3 intact + self.assertEqual((states[2].readers, states[2].writers), (1, 0)) - def test_parse_locks_state_partial_section_ignored(self): + def test_parse_locks_detail_mutex_absent_defaults_false(self): sample_output = """ VSS RWLock - readers = 4 - writers = 0 - readers waiting = 0 - writers waiting = 0 - mutex locked = 0 - ExtentMap RWLock readers = 1 - readers waiting = 0 - writers waiting = 0 - mutex locked = 0 + writers = 0 + ExtentMap RWLock + readers = 0 + writers = 0 """ - state = parse_locks_state(sample_output, logger) - # Second section missing writers, so we skip it - self.assertEqual(state, [(1, 4, 0)]) + states = parse_locks_detail(sample_output, logger) + self.assertEqual(len(states), 2) + self.assertFalse(states[0].mutex_locked) + self.assertFalse(states[1].mutex_locked) if __name__ == "__main__": diff --git a/cmapi/mcs_cluster_tool/README.md b/cmapi/mcs_cluster_tool/README.md index 53a9ac8db..357e48a83 100644 --- a/cmapi/mcs_cluster_tool/README.md +++ b/cmapi/mcs_cluster_tool/README.md @@ -15,11 +15,11 @@ $ mcs [OPTIONS] COMMAND [ARGS]... **Commands**: -* `backup`: Backup Columnstore and/or MariDB data. +* `backup`: Backup Columnstore and/or MariaDB server data. * `dbrm_backup`: Columnstore DBRM Backup. -* `restore`: Restore Columnstore (and/or MariaDB) data. +* `restore`: Restore Columnstore (and/or MariaDB server) data. * `dbrm_restore`: Restore Columnstore DBRM data. -* `cskeys`: Generates a random AES encryption key and init vector and writes them to disk. +* `cskeys`: Generate a random AES encryption key and init vector and write them to disk. * `cspasswd`: Encrypt a Columnstore plaintext password. * `bootstrap-single-node`: Bootstrap a single node (localhost)... * `review`: Provides useful functions to review and troubleshoot the MCS cluster. @@ -31,11 +31,11 @@ $ mcs [OPTIONS] COMMAND [ARGS]... * `node`: Cluster nodes management. * `set`: Set cluster parameters. * `cluster`: MariaDB Columnstore cluster management... -* `cmapi`: CMAPI itself related commands. +* `cmapi`: Commands related to CMAPI itself. ## `mcs backup` -Backup Columnstore and/or MariDB data. +Backup Columnstore and/or MariaDB data. **Usage**: @@ -654,7 +654,7 @@ $ mcs cluster set log-level [OPTIONS] ## `mcs cmapi` -CMAPI itself related commands. +Commands related to CMAPI itself. **Usage**: diff --git a/cmapi/mcs_cluster_tool/__main__.py b/cmapi/mcs_cluster_tool/__main__.py index 09086d51a..83aac6eb7 100644 --- a/cmapi/mcs_cluster_tool/__main__.py +++ b/cmapi/mcs_cluster_tool/__main__.py @@ -39,7 +39,7 @@ app.command( app.command( 'cskeys', rich_help_panel='Tools commands', short_help=( - 'Generates a random AES encryption key and init vector and writes ' + 'Generate a random AES encryption key and init vector and write ' 'them to disk.' ) )(tools_commands.cskeys) diff --git a/cmapi/mcs_cluster_tool/backup_commands.py b/cmapi/mcs_cluster_tool/backup_commands.py index 1b74e6bab..bf623cb2e 100644 --- a/cmapi/mcs_cluster_tool/backup_commands.py +++ b/cmapi/mcs_cluster_tool/backup_commands.py @@ -284,7 +284,7 @@ def backup( ) ] = None, ): - """Backup Columnstore and/or MariDB data.""" + """Backup Columnstore and/or MariaDB data.""" # Local Storage Examples: # ./$0 backup -bl /tmp/backups/ -bd Local -s LocalStorage diff --git a/cmapi/mcs_cluster_tool/cmapi_app.py b/cmapi/mcs_cluster_tool/cmapi_app.py index 06bbdf750..32cdcd184 100644 --- a/cmapi/mcs_cluster_tool/cmapi_app.py +++ b/cmapi/mcs_cluster_tool/cmapi_app.py @@ -14,7 +14,7 @@ from mcs_cluster_tool.decorators import handle_output logger = logging.getLogger('mcs_cli') app = typer.Typer( - help='CMAPI itself related commands.' + help='Commands related to CMAPI itself.' ) From 55241e6f0e8dd3df5878f8c5c6f85e8413c3b47e Mon Sep 17 00:00:00 2001 From: Alexander Presnyakov Date: Mon, 8 Sep 2025 14:47:07 +0000 Subject: [PATCH 7/8] Fixed import problem --- cmapi/cmapi_server/process_dispatchers/container.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmapi/cmapi_server/process_dispatchers/container.py b/cmapi/cmapi_server/process_dispatchers/container.py index 62b6ea53c..f3a1fc42f 100644 --- a/cmapi/cmapi_server/process_dispatchers/container.py +++ b/cmapi/cmapi_server/process_dispatchers/container.py @@ -223,7 +223,7 @@ class ContainerDispatcher(BaseDispatcher): # Run pre-stop lock reset before saving BRM # These stale locks can occur if the controllernode couldn't stop correctly # and they cause mcs-savebrm.py to hang - locks.release_shmem_locks(logger) + release_shmem_locks(logger) # start mcs-savebrm.py before stoping workernode logger.debug('Waiting to save BRM.') From 4f3ed2a6bd1731ca01f8f9689b3c64a60148bdee Mon Sep 17 00:00:00 2001 From: Leonid Fedorov Date: Tue, 9 Sep 2025 12:36:38 +0000 Subject: [PATCH 8/8] fix(mtr): MCOL-5756, fix extended mtr tests --- .../columnstore/devregression/suite.opt | 1 + mysql-test/columnstore/devregression/suite.pm | 29 +++++++++++++++++++ .../t/mcs7238_regression_MCOL-830.test | 6 ++++ .../columnstore/include/cross_engine.inc | 28 ++++++++++++++++++ .../columnstore/include/drop_cross_engine.inc | 4 +++ 5 files changed, 68 insertions(+) create mode 100644 mysql-test/columnstore/devregression/suite.opt create mode 100644 mysql-test/columnstore/devregression/suite.pm create mode 100644 mysql-test/columnstore/include/cross_engine.inc create mode 100644 mysql-test/columnstore/include/drop_cross_engine.inc diff --git a/mysql-test/columnstore/devregression/suite.opt b/mysql-test/columnstore/devregression/suite.opt new file mode 100644 index 000000000..fbd322fdd --- /dev/null +++ b/mysql-test/columnstore/devregression/suite.opt @@ -0,0 +1 @@ +--plugin-load-add=$HA_COLUMNSTORE_SO diff --git a/mysql-test/columnstore/devregression/suite.pm b/mysql-test/columnstore/devregression/suite.pm new file mode 100644 index 000000000..271dcc8a2 --- /dev/null +++ b/mysql-test/columnstore/devregression/suite.pm @@ -0,0 +1,29 @@ +package My::Suite::ColumnStore; + +@ISA = qw(My::Suite); + +my $mcs_bin_dir_compiled=$::bindir . '/storage/columnstore/columnstore/bin'; +my $mcs_ins_dir_installed=$::bindir . '/bin'; + +if (-d $mcs_bin_dir_compiled) +{ + $ENV{MCS_MCSSETCONFIG}=$mcs_bin_dir_compiled . "/mcsSetConfig"; + $ENV{MCS_CPIMPORT}=$mcs_bin_dir_compiled . "/cpimport"; + $ENV{MCS_SYSCATALOG_MYSQL_SQL}=$::mysqld_variables{'basedir'} . "/storage/columnstore/columnstore/dbcon/mysql/syscatalog_mysql.sql"; +} +elsif (-d $mcs_ins_dir_installed) +{ + $ENV{MCS_MCSSETCONFIG}=$mcs_ins_dir_installed . "/mcsSetConfig"; + $ENV{MCS_CPIMPORT}=$mcs_ins_dir_installed . "/cpimport"; + $ENV{MCS_SYSCATALOG_MYSQL_SQL}=$::mysqld_variables{'basedir'} . "/share/columnstore/syscatalog_mysql.sql"; +} + +sub is_default { 0 } + +sub start_test { + # we should guard this for --force-restart flag condition. + my ($self, $tinfo)= @_; + My::Suite::start_test(@_); +} + +bless { }; diff --git a/mysql-test/columnstore/devregression/t/mcs7238_regression_MCOL-830.test b/mysql-test/columnstore/devregression/t/mcs7238_regression_MCOL-830.test index f463bfa10..5237779c4 100644 --- a/mysql-test/columnstore/devregression/t/mcs7238_regression_MCOL-830.test +++ b/mysql-test/columnstore/devregression/t/mcs7238_regression_MCOL-830.test @@ -5,6 +5,9 @@ # -------------------------------------------------------------- # # --source ../include/have_columnstore.inc + +--source ../include/cross_engine.inc + # USE tpch1; # @@ -32,3 +35,6 @@ DROP TABLE test.mcol830b; --enable_warnings # + +--source ../include/drop_cross_engine.inc + diff --git a/mysql-test/columnstore/include/cross_engine.inc b/mysql-test/columnstore/include/cross_engine.inc new file mode 100644 index 000000000..3a31cdd11 --- /dev/null +++ b/mysql-test/columnstore/include/cross_engine.inc @@ -0,0 +1,28 @@ +# -------------------------------------------------------------- # +# Enable cross engine join +# Configure user and password in Columnstore.xml file +# -------------------------------------------------------------- # + +--disable_query_log +if (!$MASTER_MYPORT) +{ + # Running with --extern + let $MASTER_MYPORT=`SELECT @@port`; +} + +--exec $MCS_MCSSETCONFIG CrossEngineSupport User 'cejuser' +--exec $MCS_MCSSETCONFIG CrossEngineSupport Password 'Vagrant1|0000001' +--exec $MCS_MCSSETCONFIG CrossEngineSupport Port $MASTER_MYPORT + +# -------------------------------------------------------------- # +# Create corresponding in the server +# -------------------------------------------------------------- # + +--disable_warnings +CREATE USER IF NOT EXISTS'cejuser'@'localhost' IDENTIFIED BY 'Vagrant1|0000001'; +--enable_warnings + +GRANT ALL PRIVILEGES ON *.* TO 'cejuser'@'localhost'; +FLUSH PRIVILEGES; +--enable_query_log + diff --git a/mysql-test/columnstore/include/drop_cross_engine.inc b/mysql-test/columnstore/include/drop_cross_engine.inc new file mode 100644 index 000000000..7921a4413 --- /dev/null +++ b/mysql-test/columnstore/include/drop_cross_engine.inc @@ -0,0 +1,4 @@ +--disable_query_log +REVOKE ALL PRIVILEGES ON *.* FROM 'cejuser'@'localhost'; +DROP USER 'cejuser'@'localhost'; +--enable_query_log