From f594d2768585bd57315d9f60e43f9b2093280f35 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Fri, 17 Jan 2025 20:47:27 +0000 Subject: [PATCH] feat(): accounts hash tables RAM allocations/removes STLPoolAllocator --- dbcon/joblist/tuplehashjoin.cpp | 23 +++++++++++- utils/joiner/tuplejoiner.cpp | 65 ++++++++++++++++++--------------- utils/joiner/tuplejoiner.h | 18 ++++++--- 3 files changed, 68 insertions(+), 38 deletions(-) diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index e05010a58..639054c06 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -302,7 +302,7 @@ void TupleHashJoinStep::startSmallRunners(uint index) stopMemTracking = false; utils::VLArray jobs(numCores); - uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); }); + // uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); }); // starting 1 thread when in PM mode, since it's only inserting into a // vector of rows. The rest will be started when converted to UM mode. if (joiners[index]->inUM()) @@ -331,7 +331,7 @@ void TupleHashJoinStep::startSmallRunners(uint index) stopMemTracking = true; memTrackDone.notify_one(); memTrackMutex.unlock(); - jobstepThreadPool.join(memMonitor); + // jobstepThreadPool.join(memMonitor); /* If there was an error or an abort, drain the input DL, do endOfInput on the output */ @@ -481,6 +481,22 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* dlMutex.unlock(); } } + catch (std::bad_alloc& exc) + { + if (!joinIsTooBig && + (isDML || !allowDJS || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000))) + { + joinIsTooBig = true; + ostringstream oss; + oss << "(" << __LINE__ << ") " + << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); + fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); + errorMessage(oss.str()); + status(logging::ERR_JOIN_TOO_BIG); + cout << "Join is too big, raise the UM join limit for now" << endl; + abort(); + } + } catch (...) { handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_JOIN_TOO_BIG, @@ -2036,6 +2052,9 @@ void TupleHashJoinStep::abort() JobStep::abort(); boost::mutex::scoped_lock sl(djsLock); + for (auto& joiner : joiners) + joiner->abort(); + if (djs.size()) { for (uint32_t i = 0, e = djs.size(); e < i; i++) diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index 01b7cf1c3..1818a5373 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -18,6 +18,7 @@ #include "tuplejoiner.h" #include +#include #include #include #include @@ -37,13 +38,6 @@ using namespace joblist; namespace joiner { -// TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, -// uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt, -// threadpool::ThreadPool* jsThreadPool) -// : TupleJoiner(smallInput, largeInput, smallJoinColumn, largeJoinColumn, jt, jsThreadPool, nullptr) -// { -// } - // Typed joiner ctor TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt, @@ -60,6 +54,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R , numCores(numCores) , jobstepThreadPool(jsThreadPool) , _convertToDiskJoin(false) + , resourceManager_(rm) { uint i; @@ -69,11 +64,12 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE) { ld.reset(new boost::scoped_ptr[bucketCount]); - _pool.reset(new boost::shared_ptr[bucketCount]); + // _pool.reset(new boost::shared_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - STLPoolAllocator> alloc(resourceManager_); - _pool[i] = alloc.getPoolAllocator(); + // STLPoolAllocator> alloc(resourceManager_); + // _pool[i] = alloc.getPoolAllocator(); + auto alloc = resourceManager_->getAllocator>(); ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)); } } @@ -83,8 +79,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R _pool.reset(new boost::shared_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - STLPoolAllocator> alloc(resourceManager_); - _pool[i] = alloc.getPoolAllocator(); + // STLPoolAllocator> alloc(resourceManager_); + // _pool[i] = alloc.getPoolAllocator(); + auto alloc = resourceManager_->getAllocator>(); sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); } } @@ -94,8 +91,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R _pool.reset(new boost::shared_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - STLPoolAllocator> alloc(resourceManager_); - _pool[i] = alloc.getPoolAllocator(); + // STLPoolAllocator> alloc(resourceManager_); + // _pool[i] = alloc.getPoolAllocator(); + auto alloc = resourceManager_->getAllocator>(); h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); } } @@ -176,6 +174,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R , numCores(numCores) , jobstepThreadPool(jsThreadPool) , _convertToDiskJoin(false) + , resourceManager_(rm) { uint i; @@ -185,8 +184,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R ht.reset(new boost::scoped_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - STLPoolAllocator> alloc(resourceManager_); - _pool[i] = alloc.getPoolAllocator(); + // STLPoolAllocator> alloc(resourceManager_); + // _pool[i] = alloc.getPoolAllocator(); + auto alloc = resourceManager_->getAllocator>(); ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); } m_bucketLocks.reset(new boost::mutex[bucketCount]); @@ -284,7 +284,7 @@ void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t* tables) uint i; bool done = false, wasProductive; - while (!done) + while (!done && !wasAborted_) { done = true; wasProductive = false; @@ -292,14 +292,16 @@ void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t* tables) { if (buckets[i].empty()) continue; - bool gotIt = m_bucketLocks[i].try_lock(); - if (!gotIt) { - done = false; - continue; + boost::unique_lock lock(m_bucketLocks[i], boost::try_to_lock); + if (!lock.owns_lock()) + { + done = false; + continue; + } + tables[i]->insert(buckets[i].begin(), buckets[i].end()); } - tables[i]->insert(buckets[i].begin(), buckets[i].end()); - m_bucketLocks[i].unlock(); + wasProductive = true; buckets[i].clear(); } @@ -398,13 +400,15 @@ void TupleJoiner::insertRGData(RowGroup& rg, uint threadID) rowCount = rg.getRowCount(); rg.getRow(0, &r); - m_cpValuesLock.lock(); - for (i = 0; i < rowCount; i++, r.nextRow()) { - updateCPData(r); - r.zeroRid(); + boost::unique_lock lock(m_cpValuesLock); + for (i = 0; i < rowCount; i++, r.nextRow()) + { + updateCPData(r); + r.zeroRid(); + } } - m_cpValuesLock.unlock(); + rg.getRow(0, &r); if (joinAlg == UM) @@ -1828,8 +1832,9 @@ void TupleJoiner::clearData() for (uint i = 0; i < bucketCount; i++) { - STLPoolAllocator> alloc; - _pool[i] = alloc.getPoolAllocator(); + // STLPoolAllocator> alloc; + // _pool[i] = alloc.getPoolAllocator(); + auto alloc = resourceManager_->getAllocator>(); if (typelessJoin) ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE) diff --git a/utils/joiner/tuplejoiner.h b/utils/joiner/tuplejoiner.h index 82e5c9fb3..afb580cb6 100644 --- a/utils/joiner/tuplejoiner.h +++ b/utils/joiner/tuplejoiner.h @@ -26,6 +26,7 @@ #include #include +#include "countingallocator.h" #include "resourcemanager.h" #include "rowgroup.h" #include "joiner.h" @@ -471,22 +472,26 @@ class TupleJoiner return finished; } void setConvertToDiskJoin(); + void abort() + { + wasAborted_ = true; + } private: typedef std::unordered_multimap, - utils::STLPoolAllocator > > + allocators::CountingAllocator > > hash_t; typedef std::unordered_multimap, - utils::STLPoolAllocator > > + allocators::CountingAllocator > > sthash_t; typedef std::unordered_multimap< TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to, - utils::STLPoolAllocator > > + allocators::CountingAllocator > > typelesshash_t; // MCOL-1822 Add support for Long Double AVG/SUM small side typedef std::unordered_multimap< long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, - utils::STLPoolAllocator > > + allocators::CountingAllocator > > ldhash_t; typedef hash_t::iterator iterator; @@ -525,6 +530,7 @@ class TupleJoiner }; JoinAlg joinAlg; joblist::JoinType joinType; + // WIP std::shared_ptr[]> _pool; // pools for the table and nodes uint32_t threadCount; std::string tableName; @@ -558,7 +564,7 @@ class TupleJoiner uint bucketCount; uint bucketMask; boost::scoped_array m_bucketLocks; - boost::mutex m_typelessLock, m_cpValuesLock; + boost::mutex m_cpValuesLock; utils::Hasher_r bucketPicker; const uint32_t bpSeed = 0x4545e1d7; // an arbitrary random # threadpool::ThreadPool* jobstepThreadPool; @@ -572,7 +578,7 @@ class TupleJoiner bool _convertToDiskJoin; joblist::ResourceManager* resourceManager_ = nullptr; - + bool wasAborted_ = false; }; } // namespace joiner