From 3dfc8cd4542279c5e677e43b2a1bc857de1ecfa5 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Fri, 14 Feb 2025 13:32:02 +0000 Subject: [PATCH] feat(): first cleanup --- CMakeLists.txt | 3 +- dbcon/joblist/batchprimitiveprocessor-jl.cpp | 1 + dbcon/joblist/tuplehashjoin.cpp | 53 ------------------ .../primproc/batchprimitiveprocessor.cpp | 5 +- primitives/primproc/batchprimitiveprocessor.h | 11 ---- utils/common/countingallocator.h | 56 ++++--------------- utils/common/fixedallocator.cpp | 5 -- utils/common/poolallocator.cpp | 5 -- utils/messageqcpp/inetstreamsocket.cpp | 3 +- utils/rowgroup/rowgroup.cpp | 18 ------ utils/rowgroup/rowgroup.h | 9 +-- 11 files changed, 18 insertions(+), 151 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5930aa378..a16c38332 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -262,7 +262,8 @@ ENDIF() MY_CHECK_AND_SET_COMPILER_FLAG("-Wno-deprecated-copy" DEBUG RELEASE RELWITHDEBINFO MINSIZEREL) MY_CHECK_AND_SET_COMPILER_FLAG("-Wno-deprecated-declarations" DEBUG RELEASE RELWITHDEBINFO MINSIZEREL) -MY_CHECK_AND_SET_COMPILER_FLAG("-Wall -Wextra") + +MY_CHECK_AND_SET_COMPILER_FLAG("-Werror -Wall -Wextra") SET (ENGINE_LDFLAGS "-Wl,--no-as-needed -Wl,--add-needed") SET (ENGINE_DT_LIB datatypes) SET (ENGINE_COMMON_LIBS messageqcpp loggingcpp configcpp idbboot boost_thread xml2 pthread rt ${ENGINE_DT_LIB}) diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 93e61faef..64ed49a31 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -1481,6 +1481,7 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) if (tJoiners[joinerNum]->isTypelessJoin()) { + // TODO: change RM ptr to ref b/c its scope and lifetime lasts till the end of the program. auto alloc = rm_->getAllocator(); utils::FixedAllocator fa(alloc, tlKeyLens[joinerNum], true); diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index cdb7b0461..00d7e7c51 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -276,7 +276,6 @@ void TupleHashJoinStep::startSmallRunners(uint index) stopMemTracking = true; memTrackDone.notify_one(); memTrackMutex.unlock(); - // jobstepThreadPool.join(memMonitor); /* If there was an error or an abort, drain the input DL, do endOfInput on the output */ @@ -438,27 +437,6 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* if disk join is enabled, use it. else abort. */ - // boost::unique_lock sl(saneErrMsg); - - // if (cancelled()) - // return; - // if (!allowDJS || isDML || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000)) - // { - // joinIsTooBig = true; - // ostringstream oss; - // oss << "(" << __LINE__ << ") " - // << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); - // fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); - // errorMessage(oss.str()); - // status(logging::ERR_JOIN_TOO_BIG); - // cout << "Join is too big, raise the UM join limit for now (small runner)" << endl; - // abort(); - // } - // else if (allowDJS) - // { - // joiner->setConvertToDiskJoin(); - // // TODO RGData that triggers this path is lost. Need to store it to pass it future. - // } return outOfMemoryHandler(joiner); } joiner->insertRGData(smallRG, threadID); @@ -480,37 +458,6 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* } catch (std::bad_alloc& exc) { - // boost::unique_lock sl(saneErrMsg); - - // if (!joinIsTooBig && - // (isDML || !allowDJS || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000))) - // { - // joinIsTooBig = true; - // ostringstream oss; - // oss << "(" << __LINE__ << ") " - // << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); - // fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); - // errorMessage(oss.str()); - // status(logging::ERR_JOIN_TOO_BIG); - // cout << "Join is too big, raise the UM join limit for now" << endl; - // abort(); - // } - // else if (allowDJS) - // { - // joiner->setConvertToDiskJoin(); - // // RGData that triggers OOM is saved but the hash table is too big. - // // TBD need to store RGData if counting allocator will be used for RGData. - - // // Need to clean hash tables and vec. - // // joiner->clearData(); // `finished` flag can implicitly interfere. - // } - // return; - - // RGData that triggers OOM is saved but the hash table is too big. - // TBD need to store RGData if counting allocator will be used for RGData. - - // Need to clean hash tables and vec. - // joiner->clearData(); // `finished` flag can implicitly interfere. return outOfMemoryHandler(joiner); } catch (...) diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index fcff86b61..9ced17195 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -326,12 +326,9 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) typelessJoin.reset(new bool[joinerCount]); tlSmallSideKeyLengths.reset(new uint32_t[joinerCount]); - // storedKeyAllocators.reset(new PoolAllocator[joinerCount]); + auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator(); for (uint j = 0; j < joinerCount; ++j) { - // storedKeyAllocators[j].setUseLock(true); - // WIP use one copy of the allocator - auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator(); storedKeyAllocators.emplace_back(PoolAllocator(alloc, PoolAllocator::DEFAULT_WINDOW_SIZE, false, true)); } diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index e11ddac2d..2aff39034 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -270,7 +270,6 @@ class BatchPrimitiveProcessor uint32_t physIO, cachedIO, touchedBlocks; SP_UM_IOSOCK sock; - // messageqcpp::SBS serialized; SP_UM_MUTEX writelock; // MCOL-744 using pthread mutex instead of Boost mutex because @@ -309,19 +308,10 @@ class BatchPrimitiveProcessor bool hasRowGroup; /* Rowgroups + join */ - // typedef std::unordered_multimap, - // utils::STLPoolAllocator>> - // TJoiner; using TJoiner = std::unordered_multimap, allocators::CountingAllocator>>; - // typedef std::unordered_multimap< - // joiner::TypelessData, uint32_t, joiner::TupleJoiner::TypelessDataHasher, - // joiner::TupleJoiner::TypelessDataComparator, - // utils::STLPoolAllocator>> - // TLJoiner; using TLJoiner = std::unordered_multimap& v); // these allocators hold the memory for the keys stored in tlJoiners - // WIP This was a shared vec of allocators originally but it might not be necessary. // This might give far memory allocations for keys used by JOIN hashmap. std::vector storedKeyAllocators; diff --git a/utils/common/countingallocator.h b/utils/common/countingallocator.h index d0c140ee8..89c6176c1 100644 --- a/utils/common/countingallocator.h +++ b/utils/common/countingallocator.h @@ -23,15 +23,18 @@ #include #include -#include - namespace allocators { -// WIP placement +// This is an aggregating custom allocator that tracks the memory usage using +// a globally unique atomic counter. +// It is supposed to recv a ptr to an atomic from a singleton entity, e.g. ResourceManager. +// NB The atomic provides an upper hard limit for the memory usage and not the usage counter. +// The allocator's model counts allocated size locally and to sync allocated size difference +// every CheckPointStepSize(100MB by default) both allocating and deallocating. +// When a sync op hits MemoryLimitLowerBound trying to allocate more memory, it throws. +// SQL operators or TBPS runtime must catch the exception and act acordingly. - -// const constexpr std::uint64_t CounterUpdateUnitSize = 4 * 1024 * 1024; const constexpr std::int64_t MemoryLimitLowerBound = 500 * 1024 * 1024; // WIP const constexpr std::int64_t CheckPointStepSize = 100 * 1024 * 1024; // WIP @@ -57,38 +60,24 @@ class CountingAllocator void changeLocalAndGlobalMemoryLimits(const int64_t sizeChange) { // This routine must be used for mem allocation accounting path only! - // The case Current > last checkpoint(we deallocated mem since the last checkpoint), sizeIncrease is - // negative b/c we now move into the opposite direction. The case Last Checkpoint > Current (we allocated + // The case CurrentCheckpoint > LastCheckpoint(we deallocated mem since the last checkpoint), sizeIncrease is + // negative b/c we now move into the opposite direction. The case Last > Current (we allocated // mem since the last checkpoint), sizeIncrease is positive int64_t sizeChangeWDirection = (currentLocalMemoryUsage_ <= lastMemoryLimitCheckpoint_) ? -sizeChange : sizeChange; int64_t diffSinceLastCheckPoint = int_distance(currentLocalMemoryUsage_, lastMemoryLimitCheckpoint_); if (needCheckPoint(sizeChangeWDirection, diffSinceLastCheckPoint, checkPointStepSize_)) { - // std::cout << "changeLocalAndGlobalMemoryLimits " << sizeChange << " bytes at " - // << " diffSinceLastCheckPoint " << diffSinceLastCheckPoint << ". current timit: " << std::dec - // << memoryLimit_->load() << std::hex << " bytes.\n"; - // std::cout << std::dec; - - // auto lastMemoryLimitCheckpointDiff = diffSinceLastCheckPoint + sizeChangeWDirection; int64_t lastMemoryLimitCheckpointDiff = (currentLocalMemoryUsage_ <= lastMemoryLimitCheckpoint_) ? sizeChange - diffSinceLastCheckPoint : sizeChange + diffSinceLastCheckPoint; assert(lastMemoryLimitCheckpointDiff > 0); - // { - // std::cout << "[Allocate::changeLocalAndGlobalMemoryLimits!!!] lastMemoryLimitCheckpoint_ " - // << lastMemoryLimitCheckpoint_ << " currentLocalMemoryUsage_ " << currentLocalMemoryUsage_ - // << " sizeChangeWDirection " << sizeChangeWDirection << " lastMemoryLimitCheckpointDiff " << lastMemoryLimitCheckpointDiff - // << std::endl; - // } - // lastMemoryLimitCheckpointDiff sign signifies a direction we move allocating memory. auto currentGlobalMemoryLimit = memoryLimit_->fetch_sub(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed); if (currentGlobalMemoryLimit < memoryLimitLowerBound_) { memoryLimit_->fetch_add(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed); - // ? what to do with local counters here throw std::bad_alloc(); } lastMemoryLimitCheckpoint_ += lastMemoryLimitCheckpointDiff; @@ -123,9 +112,6 @@ class CountingAllocator changeLocalAndGlobalMemoryLimits(sizeAllocated); T* ptr = static_cast(::operator new(sizeAllocated)); - // std::cout << "[Allocate] non-array " << n * sizeof(T) << " bytes at " << static_cast(ptr) - // << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n"; - // std::cout << std::dec; return ptr; } @@ -138,8 +124,6 @@ class CountingAllocator changeLocalAndGlobalMemoryLimits(sizeAllocated); T ptr = static_cast(::operator new[](n)); - // std::cout << "[Allocate] array " << n * sizeof(T) << " bytes at " << static_cast(ptr) - // << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n"; return ptr; } @@ -150,10 +134,6 @@ class CountingAllocator int64_t sizeToDeallocate = n * sizeof(T); - // std::cout << "[Deallocate start] " << sizeToDeallocate << " bytes from " << static_cast(ptr) - // << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n"; - // std::cout << std::dec; - int64_t sizeChangeWDirection = (currentLocalMemoryUsage_ >= lastMemoryLimitCheckpoint_) ? -sizeToDeallocate : sizeToDeallocate; int64_t diffSinceLastCheckPoint = int_distance(currentLocalMemoryUsage_, lastMemoryLimitCheckpoint_); @@ -161,33 +141,17 @@ class CountingAllocator if (needCheckPoint(sizeChangeWDirection, diffSinceLastCheckPoint, checkPointStepSize_)) { // Invariant is lastMemoryLimitCheckpoint_ >= currentLocalMemoryUsage_ - sizeToDeallocate - // and lastMemoryLimitCheckpoint_ value must be negative. - // int64_t lastMemoryLimitCheckpointDiff = - // labs(lastMemoryLimitCheckpoint_ - currentLocalMemoryUsage_ - sizeToDeallocate); - // auto lastMemoryLimitCheckpointDiff = diffSinceLastCheckPoint + sizeChangeWDirection; int64_t lastMemoryLimitCheckpointDiff = (currentLocalMemoryUsage_ >= lastMemoryLimitCheckpoint_) ? sizeToDeallocate - (currentLocalMemoryUsage_ - lastMemoryLimitCheckpoint_) : diffSinceLastCheckPoint + sizeToDeallocate; assert(lastMemoryLimitCheckpointDiff > 0); - - // std::cout << "[Deallocate checkpoint!!!] lastMemoryLimitCheckpoint_ " << lastMemoryLimitCheckpoint_ - // << " currentLocalMemoryUsage_ " << currentLocalMemoryUsage_ << " sizeChangeWDirection " - // << sizeChangeWDirection << " lastMemoryLimitCheckpointDiff " << lastMemoryLimitCheckpointDiff - // << std::endl; - - // assert(lastMemoryLimitCheckpointDiff < 0); memoryLimit_->fetch_add(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed); lastMemoryLimitCheckpoint_ -= (lastMemoryLimitCheckpoint_ == 0) ? 0 : lastMemoryLimitCheckpointDiff; } currentLocalMemoryUsage_ = currentLocalMemoryUsage_ - sizeToDeallocate; - - // std::cout << "[Deallocate end] " << n * sizeof(T) << " bytes from " << static_cast(ptr) - // << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n"; - - // std::cout << std::dec; } // Equality operators (allocators are equal if they share the same counter) diff --git a/utils/common/fixedallocator.cpp b/utils/common/fixedallocator.cpp index cd91cc645..f0a5d8da3 100644 --- a/utils/common/fixedallocator.cpp +++ b/utils/common/fixedallocator.cpp @@ -69,8 +69,6 @@ void FixedAllocator::setAllocSize(uint allocSize) void FixedAllocator::newBlock() { - // boost::shared_ptr next; - capacityRemaining = elementCount * elementSize; if (!tmpSpace || mem.size() == 0) @@ -83,9 +81,6 @@ void FixedAllocator::newBlock() { mem.emplace_back(boost::make_shared(elementCount * elementSize)); } - // next.reset(new uint8_t[elementCount * elementSize]); - // mem.push_back(next); - // nextAlloc = next.get(); nextAlloc = mem.back().get(); } else diff --git a/utils/common/poolallocator.cpp b/utils/common/poolallocator.cpp index d0a284d71..aa6449cc7 100644 --- a/utils/common/poolallocator.cpp +++ b/utils/common/poolallocator.cpp @@ -20,7 +20,6 @@ * ******************************************************************************************/ -#include //#define NDEBUG #include #include @@ -48,15 +47,12 @@ void PoolAllocator::deallocateAll() capacityRemaining = 0; nextAlloc = NULL; memUsage = 0; - // WIP double check the space is cleaned up. mem.clear(); oob.clear(); } void PoolAllocator::newBlock() { - // boost::shared_ptr next; - capacityRemaining = allocSize; if (!tmpSpace || mem.size() == 0) @@ -69,7 +65,6 @@ void PoolAllocator::newBlock() { mem.emplace_back(boost::make_shared(allocSize)); } - // mem.push_back(next); nextAlloc = mem.back().get(); } else diff --git a/utils/messageqcpp/inetstreamsocket.cpp b/utils/messageqcpp/inetstreamsocket.cpp index 998a00448..08bfaefd2 100644 --- a/utils/messageqcpp/inetstreamsocket.cpp +++ b/utils/messageqcpp/inetstreamsocket.cpp @@ -520,7 +520,8 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO return SBS(new ByteStream(0U)); // Allocate new memory for the `long string`. - // WIP must account this allocation also. + // TODO account this allocation also despite the fact BS allocations are insignificant + // compared with structs used by SQL operators. rowgroup::StringStoreBufSPType longString( new uint8_t[sizeof(rowgroup::StringStore::MemChunk) + memChunk.currentSize]); diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index 756ad21ce..01bfca90f 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -63,7 +63,6 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len) uint64_t ret = 0; empty = false; // At least a nullptr is being stored. - // Sometimes the caller actually wants "" to be returned....... argggghhhh...... // if (len == 0) // return numeric_limits::max(); @@ -80,18 +79,15 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len) if (mem.size() > 0) lastMC = (MemChunk*)mem.back().get(); - // std::cout << "StringStore::storeString len " << len << std::endl; if ((len + 4) >= CHUNK_SIZE) { auto allocSize = len + sizeof(MemChunk) + 4; if (alloc) { - // cout << "StringStore::storeString longStrings with alloc " << std::endl; longStrings.emplace_back(boost::allocate_shared(*alloc, allocSize)); } else { - // cout << "StringStore::storeString longStrings no alloc " << std::endl; longStrings.emplace_back(boost::make_shared(allocSize)); } // std::shared_ptr newOne(new uint8_t[len + sizeof(MemChunk) + 4]); @@ -107,22 +103,14 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len) { if ((lastMC == nullptr) || (lastMC->capacity - lastMC->currentSize < (len + 4))) { - // mem usage debugging - // if (lastMC) if (alloc) { - // cout << "StringStore::storeString with alloc " << std::endl; mem.emplace_back(boost::allocate_shared(*alloc, CHUNK_SIZE + sizeof(MemChunk))); - // boost::allocate_shared) newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]); } else { - // cout << "StringStore::storeString no alloc " << std::endl; mem.emplace_back(boost::make_shared(CHUNK_SIZE + sizeof(MemChunk))); - // mem.emplace_back(boost::allocate_shared(*alloc, CHUNK_SIZE + sizeof(MemChunk))); - // std::shared_ptr newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]); } - // mem.push_back(newOne); lastMC = reinterpret_cast(mem.back().get()); lastMC->currentSize = 0; lastMC->capacity = CHUNK_SIZE; @@ -187,20 +175,16 @@ void StringStore::deserialize(ByteStream& bs) for (i = 0; i < count; i++) { bs >> size; - // cout << "deserializing " << size << " bytes\n"; buf = bs.buf(); if (alloc) { - // cout << "StringStore::deserialize with alloc " << std::endl; mem.emplace_back(boost::allocate_shared(*alloc, size + sizeof(MemChunk))); } else { - // cout << "StringStore::deserialize no alloc " << std::endl; mem.emplace_back(boost::make_shared(size + sizeof(MemChunk))); } - // mem[i].reset(new uint8_t[size + sizeof(MemChunk)]); mc = (MemChunk*)mem[i].get(); mc->currentSize = size; mc->capacity = size; @@ -365,12 +349,10 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount) { if (alloc) { - // cout << "RGData::reinit with alloc " << std::endl; rowData = boost::allocate_shared(*alloc, rg.getDataSize(rowCount)); } else { - // cout << "RGData::reinit no alloc " << std::endl; rowData.reset(new uint8_t[rg.getDataSize(rowCount)]); } diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index ac85a7ff4..aa028158c 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -1,6 +1,6 @@ /* Copyright (C) 2014 InfiniDB, Inc. - Copyright (c) 2019 MariaDB Corporation + Copyright (c) 2016-2024 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -132,10 +132,6 @@ inline T derefFromTwoVectorPtrs(const std::vector* outer, const std::vectoroperator[](outerIdx); } -// using RGDataBufType = uint8_t[]; -// using StringStoreBufType = uint8_t[]; -// using StringStoreBufSPType = boost::shared_ptr; - class StringStore { public: @@ -1043,7 +1039,6 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex if (inStringTable(colIndex)) { - // std::cout << "setStringField storeString len " << length << std::endl; offset = strings->storeString((const uint8_t*)str.str(), length); *((uint64_t*)&data[offsets[colIndex]]) = offset; // cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]]) @@ -1912,7 +1907,7 @@ inline uint32_t RowGroup::getStringTableThreshold() const return sTableThreshold; } -// WIP mb unused +// TODO This is unused, so rm this in the dev branch. inline void RowGroup::setStringStore(boost::shared_ptr ss) { if (useStringTable)