From 671b7301f3e2c372787f4f4736367e1c812bcea2 Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 24 Apr 2025 13:45:33 +0000 Subject: [PATCH] fix(allocator,perf): performance degradation caused by lack of STLPoolAllocator replaced by CountingAllocator --- dbcon/joblist/tupleannexstep.cpp | 42 +++++++---- .../primproc/batchprimitiveprocessor.cpp | 38 ++++++---- primitives/primproc/batchprimitiveprocessor.h | 23 ++++-- utils/joiner/tuplejoiner.cpp | 74 +++++++++++-------- utils/joiner/tuplejoiner.h | 59 +++++++++++---- utils/windowfunction/idborderby.cpp | 9 +-- utils/windowfunction/idborderby.h | 29 +++++++- 7 files changed, 182 insertions(+), 92 deletions(-) diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index 2a72c0e3e..62037cc75 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -83,7 +83,10 @@ struct TAEq bool operator()(const rowgroup::Row::Pointer&, const rowgroup::Row::Pointer&) const; }; // TODO: Generalize these and put them back in utils/common/hasher.h -using TNSDistinctMap_t = std::unordered_set >; +// using TNSDistinctMap_t = std::unordered_set >; +using TNSDistinctMap_t = + std::unordered_set >; }; // namespace inline uint64_t TAHasher::operator()(const Row::Pointer& p) const @@ -164,7 +167,7 @@ void TupleAnnexStep::setOutputRowGroup(const rowgroup::RowGroup& rg) void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo) { - // Initialize ResourceManager to acount memory usage. + // Initialize ResourceManager to acount memory usage. fRm = jobInfo.rm; // Initialize structures used by separate workers uint64_t id = 1; @@ -456,8 +459,11 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() Row rowSkip; bool more = false; - auto alloc = fRm->getAllocator(); - std::unique_ptr distinctMap(new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), alloc)); + // auto alloc = fRm->getAllocator(); + // std::unique_ptr distinctMap(new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), + // alloc)); + std::unique_ptr distinctMap( + new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), STLPoolAllocator(fRm))); rgDataOut.reinit(fRowGroupOut); fRowGroupOut.setData(&rgDataOut); @@ -588,12 +594,13 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() void TupleAnnexStep::checkAndAllocateMemory4RGData(const rowgroup::RowGroup& rowGroup) { - uint64_t size = rowGroup.getSizeWithStrings() - rowGroup.getHeaderSize(); - if (!fRm->getMemory(size, false)) - { - cerr << IDBErrorInfo::instance()->errorMsg(ERR_TNS_DISTINCT_IS_TOO_BIG) << " @" << __FILE__ << ":" << __LINE__; - throw IDBExcept(ERR_TNS_DISTINCT_IS_TOO_BIG); - } + uint64_t size = rowGroup.getSizeWithStrings() - rowGroup.getHeaderSize(); + if (!fRm->getMemory(size, false)) + { + cerr << IDBErrorInfo::instance()->errorMsg(ERR_TNS_DISTINCT_IS_TOO_BIG) << " @" << __FILE__ << ":" + << __LINE__; + throw IDBExcept(ERR_TNS_DISTINCT_IS_TOO_BIG); + } } void TupleAnnexStep::executeWithOrderBy() @@ -713,10 +720,12 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() // Calculate offset here fRowGroupOut.getRow(0, &fRowOut); - auto allocSorting = fRm->getAllocator(); - ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, allocSorting); - auto allocDistinct = fRm->getAllocator(); - std::unique_ptr distinctMap(new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), allocDistinct)); + // auto allocSorting = fRm->getAllocator(); + ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, fRm->getAllocator()); + // ordering::SortingPQ finalPQ(rowgroup::rgCommonSize); + // auto allocDistinct = fRm->getAllocator(); + std::unique_ptr distinctMap( + new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), STLPoolAllocator(fRm))); fRowGroupIn.initRow(&row1); fRowGroupIn.initRow(&row2); @@ -910,8 +919,9 @@ void TupleAnnexStep::finalizeParallelOrderBy() uint32_t rowSize = 0; rowgroup::RGData rgDataOut; - auto alloc = fRm->getAllocator(); - ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, alloc); + // auto alloc = fRm->getAllocator(); + ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, fRm->getAllocator()); + // ordering::SortingPQ finalPQ(rowgroup::rgCommonSize); rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); fRowGroupOut.setData(&rgDataOut); fRowGroupOut.resetRowGroup(0); diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 9ced17195..f934de6de 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -329,7 +329,9 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator(); for (uint j = 0; j < joinerCount; ++j) { - storedKeyAllocators.emplace_back(PoolAllocator(alloc, PoolAllocator::DEFAULT_WINDOW_SIZE, false, true)); + storedKeyAllocators.emplace_back(PoolAllocator(alloc, PoolAllocator::DEFAULT_WINDOW_SIZE, false, + true)); + // storedKeyAllocators.emplace_back(PoolAllocator(PoolAllocator::DEFAULT_WINDOW_SIZE, false, true)); } joinNullValues.reset(new uint64_t[joinerCount]); @@ -338,6 +340,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) hasJoinFEFilters = false; hasSmallOuterJoin = false; bool smallSideRGRecvd = false; + auto* resourceManager = &exemgr::globServiceExeMgr->getRm(); for (i = 0; i < joinerCount; i++) { @@ -361,16 +364,17 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) if (!typelessJoin[i]) { - auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); - + // auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); bs >> joinNullValues[i]; bs >> largeSideKeyColumns[i]; for (uint j = 0; j < processorThreads; ++j) - tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), allocator)); + // tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), allocator)); + tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), std::equal_to(), + utils::STLPoolAllocator(resourceManager))); } else { - auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); + // auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); deserializeVector(bs, tlLargeSideKeyColumns[i]); bs >> tlSmallSideKeyLengths[i]; @@ -393,7 +397,9 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) mSmallSideKeyColumnsPtr, mSmallSideRGPtr); auto tlComparator = TupleJoiner::TypelessDataComparator(&outputRG, &tlLargeSideKeyColumns[i], mSmallSideKeyColumnsPtr, mSmallSideRGPtr); - tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, allocator)); + // tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, allocator)); + tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, + utils::STLPoolAllocator(resourceManager))); } } } @@ -2287,34 +2293,34 @@ int BatchPrimitiveProcessor::operator()() void BatchPrimitiveProcessor::allocLargeBuffers() { - auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); - + // auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); + if (ot == ROW_GROUP && !outRowGroupData) { // outputRG.setUseStringTable(true); - outRowGroupData.reset(new RGData(outputRG, allocator)); + // outRowGroupData.reset(new RGData(outputRG, allocator)); + outRowGroupData.reset(new RGData(outputRG)); outputRG.setData(outRowGroupData.get()); } if (fe1 && !fe1Data) { - // fe1Input.setUseStringTable(true); - fe1Data.reset(new RGData(fe1Input, allocator)); - // fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]); + // fe1Data.reset(new RGData(fe1Input, allocator)); + fe1Data.reset(new RGData(fe1Input)); fe1Input.setData(fe1Data.get()); } if (fe2 && !fe2Data) { - // fe2Output.setUseStringTable(true); - fe2Data.reset(new RGData(fe2Output, allocator)); + // fe2Data.reset(new RGData(fe2Output, allocator)); + fe2Data.reset(new RGData(fe2Output)); fe2Output.setData(fe2Data.get()); } if (getTupleJoinRowGroupData && !joinedRGMem) { - // joinedRG.setUseStringTable(true); - joinedRGMem.reset(new RGData(joinedRG, allocator)); + // joinedRGMem.reset(new RGData(joinedRG, allocator)); + joinedRGMem.reset(new RGData(joinedRG)); joinedRG.setData(joinedRGMem.get()); } } diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index 2aff39034..ee817fdb5 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -308,14 +308,23 @@ class BatchPrimitiveProcessor bool hasRowGroup; /* Rowgroups + join */ - using TJoiner = - std::unordered_multimap, - allocators::CountingAllocator>>; + // using TJoiner = + // std::unordered_multimap, + // allocators::CountingAllocator>>; - using TLJoiner = - std::unordered_multimap>>; + using TJoiner = std::tr1::unordered_multimap, + utils::STLPoolAllocator>>; + + // using TLJoiner = + // std::unordered_multimap>>; + + using TLJoiner = std::tr1::unordered_multimap< + joiner::TypelessData, uint32_t, joiner::TupleJoiner::TypelessDataHasher, + joiner::TupleJoiner::TypelessDataComparator, + utils::STLPoolAllocator>>; bool generateJoinedRowGroup(rowgroup::Row& baseRow, const uint32_t depth = 0); /* generateJoinedRowGroup helper fcns & vars */ diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index 8b2c71164..12644abd6 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -40,42 +40,63 @@ namespace joiner { constexpr const size_t DEFAULT_BUCKET_COUNT = 10; +template +std::unique_ptr makeHashMap(size_t bucketCount, ResourceManager* resourceManager) +{ + // auto alloc = resourceManager->getAllocator(); + // return std::unique_ptr(new T(bucketCount, TupleJoiner::hasher(), typename T::key_equal(), alloc)); + return std::unique_ptr(new HashTable(bucketCount, TupleJoiner::hasher(), + typename HashTable::key_equal(), + utils::STLPoolAllocator(resourceManager))); +} + +void TupleJoiner::initRowsVector() +{ + // auto alloc = resourceManager_->getAllocator(); + // rows.reset(new RowPointersVec(alloc)); + rows.reset(new RowPointersVec(resourceManager_->getAllocator())); +} + void TupleJoiner::initHashMaps(uint32_t& smallJoinColumn) { if (typelessJoin) { for (size_t i = 0; i < bucketCount; i++) { - auto alloc = resourceManager_->getAllocator>(); - ht.emplace_back(std::unique_ptr( - new typelesshash_t(DEFAULT_BUCKET_COUNT, hasher(), typelesshash_t::key_equal(), alloc))); + // auto alloc = resourceManager_->getAllocator>(); + // ht.emplace_back(std::unique_ptr( + // new typelesshash_t(DEFAULT_BUCKET_COUNT, hasher(), typelesshash_t::key_equal(), alloc))); + ht.emplace_back(makeHashMap(DEFAULT_BUCKET_COUNT, resourceManager_)); } } else if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE) { for (size_t i = 0; i < bucketCount; i++) { - auto alloc = resourceManager_->getAllocator>(); - ld.emplace_back(std::unique_ptr( - new ldhash_t(DEFAULT_BUCKET_COUNT, hasher(), ldhash_t::key_equal(), alloc))); + // auto alloc = resourceManager_->getAllocator>(); + // ld.emplace_back(std::unique_ptr( + // new ldhash_t(DEFAULT_BUCKET_COUNT, hasher(), ldhash_t::key_equal(), alloc))); + ld.emplace_back(makeHashMap(DEFAULT_BUCKET_COUNT, resourceManager_)); } } else if (smallRG.usesStringTable()) { for (size_t i = 0; i < bucketCount; i++) { - auto alloc = resourceManager_->getAllocator>(); - sth.emplace_back(std::unique_ptr( - new sthash_t(DEFAULT_BUCKET_COUNT, hasher(), sthash_t::key_equal(), alloc))); + // auto alloc = resourceManager_->getAllocator>(); + // sth.emplace_back(std::unique_ptr( + // new sthash_t(DEFAULT_BUCKET_COUNT, hasher(), sthash_t::key_equal(), alloc))); + sth.emplace_back(makeHashMap(DEFAULT_BUCKET_COUNT, resourceManager_)); } } else { for (size_t i = 0; i < bucketCount; i++) { - auto alloc = resourceManager_->getAllocator>(); - h.emplace_back( - std::unique_ptr(new hash_t(DEFAULT_BUCKET_COUNT, hasher(), hash_t::key_equal(), alloc))); + // auto alloc = resourceManager_->getAllocator>(); + // h.emplace_back( + // std::unique_ptr(new hash_t(DEFAULT_BUCKET_COUNT, hasher(), hash_t::key_equal(), alloc))); + h.emplace_back(makeHashMap(DEFAULT_BUCKET_COUNT, resourceManager_)); } } } @@ -99,10 +120,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R , _convertToDiskJoin(false) , resourceManager_(rm) { - auto alloc = resourceManager_->getAllocator(); - rows.reset(new RowPointersVec(alloc)); - + initRowsVector(); getBucketCount(); + m_bucketLocks.reset(new boost::mutex[bucketCount]); initHashMaps(smallJoinColumn); @@ -161,7 +181,8 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R // Typeless joiner ctor TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, const vector& smallJoinColumns, const vector& largeJoinColumns, - JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, const uint64_t numCores) + JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, + const uint64_t numCores) : smallRG(smallInput) , largeRG(largeInput) , joinAlg(INSERTING) @@ -180,9 +201,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R { uint i; - auto alloc = resourceManager_->getAllocator(); - rows.reset(new RowPointersVec(alloc)); - + initRowsVector(); getBucketCount(); uint32_t unused = 0; @@ -870,8 +889,7 @@ void TupleJoiner::setInUM() #ifdef TJ_DEBUG cout << "done\n"; #endif - auto alloc = resourceManager_->getAllocator(); - rows.reset(new RowPointersVec(alloc)); + initRowsVector(); if (typelessJoin) { @@ -904,10 +922,7 @@ void TupleJoiner::setInUM(vector& rgs) if (joinAlg == UM) return; - { // don't need rows anymore, free the mem - auto alloc = resourceManager_->getAllocator(); - rows.reset(new RowPointersVec(alloc)); - } + initRowsVector(); joinAlg = UM; size = rgs.size(); @@ -918,7 +933,8 @@ void TupleJoiner::setInUM(vector& rgs) i = 0; for (size_t firstRow = 0; i < (uint)numCores && firstRow < size; i++, firstRow += chunkSize) jobs[i] = jobstepThreadPool->invoke( - [this, firstRow, chunkSize, size, i, &rgs] { + [this, firstRow, chunkSize, size, i, &rgs] + { this->umJoinConvert(i, rgs, firstRow, (firstRow + chunkSize < size ? firstRow + chunkSize : size)); }); @@ -1291,7 +1307,7 @@ class WideDecimalKeyConverter if (value > AT(std::numeric_limits::max()) || value < AT(std::numeric_limits::min())) return true; - convertedValue = (uint64_t) static_cast(value); + convertedValue = (uint64_t)static_cast(value); return false; } // As of MCS 6.x there is an asumption MCS can't join having @@ -1801,9 +1817,7 @@ void TupleJoiner::clearData() // This loop calls dtors and deallocates mem. clearHashMaps(); initHashMaps(smallKeyColumns[0]); - - auto alloc = resourceManager_->getAllocator(); - rows.reset(new RowPointersVec(alloc)); + initRowsVector(); finished = false; } diff --git a/utils/joiner/tuplejoiner.h b/utils/joiner/tuplejoiner.h index 47c99390a..72ce17770 100644 --- a/utils/joiner/tuplejoiner.h +++ b/utils/joiner/tuplejoiner.h @@ -204,8 +204,11 @@ class TypelessDataStructure } }; + using RowPointersVec = std::vector>; + // using RowPointersVec = + // std::vector; using RowPointersVecUP = std::unique_ptr; class TupleJoiner { @@ -473,22 +476,47 @@ class TupleJoiner void initHashMaps(uint32_t& smallJoinColumn); void clearHashMaps(); private: - typedef std::unordered_multimap, - allocators::CountingAllocator>> - hash_t; - typedef std::unordered_multimap< - int64_t, rowgroup::Row::Pointer, hasher, std::equal_to, - allocators::CountingAllocator>> - sthash_t; - typedef std::unordered_multimap< - TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to, - allocators::CountingAllocator>> - typelesshash_t; + // typedef std::unordered_multimap, + // allocators::CountingAllocator>> + // hash_t; + // typedef std::unordered_multimap< + // int64_t, rowgroup::Row::Pointer, hasher, std::equal_to, + // allocators::CountingAllocator>> + // sthash_t; + // typedef std::unordered_multimap< + // TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to, + // allocators::CountingAllocator>> + // typelesshash_t; + // // MCOL-1822 Add support for Long Double AVG/SUM small side + // typedef std::unordered_multimap< + // long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, + // allocators::CountingAllocator>> + // ldhash_t; + + + template + using HashMapTemplate = std::unordered_multimap, + utils::STLPoolAllocator>>; + using hash_t = HashMapTemplate; + using sthash_t = HashMapTemplate; + using typelesshash_t = HashMapTemplate; + using ldhash_t = HashMapTemplate; + // typedef std::unordered_multimap, + // utils::STLPoolAllocator>> + // hash_t; + // typedef std::unordered_multimap< + // int64_t, rowgroup::Row::Pointer, hasher, std::equal_to, + // utils::STLPoolAllocator>> + // sthash_t; + // typedef std::unordered_multimap< + // TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to, + // utils::STLPoolAllocator>> + // typelesshash_t; // MCOL-1822 Add support for Long Double AVG/SUM small side - typedef std::unordered_multimap< - long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, - allocators::CountingAllocator>> - ldhash_t; + // typedef std::unordered_multimap< + // long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, + // utils::STLPoolAllocator>> + // ldhash_t; typedef hash_t::iterator iterator; typedef typelesshash_t::iterator thIterator; @@ -573,6 +601,7 @@ class TupleJoiner bool _convertToDiskJoin; joblist::ResourceManager* resourceManager_ = nullptr; bool wasAborted_ = false; + void initRowsVector(); }; } // namespace joiner diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index b94fe8794..b1b9849a4 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -774,15 +774,12 @@ void IdbOrderBy::initialize(const RowGroup& rg) fRowGroup.initRow(&row2); // These two blocks contain structs with memory accounting. - { - auto alloc = fRm->getAllocator(); - fOrderByQueue.reset(new SortingPQ(rowgroup::rgCommonSize, alloc)); - } + fOrderByQueue.reset(new SortingPQ(rowgroup::rgCommonSize, fRm->getAllocator())); if (fDistinct) { - auto alloc = fRm->getAllocator(); - fDistinctMap.reset(new DistinctMap_t(10, Hasher(this, getKeyLength()), Eq(this, getKeyLength()), alloc)); + fDistinctMap.reset(new DistinctMap_t(10, Hasher(this, getKeyLength()), Eq(this, getKeyLength()), + utils::STLPoolAllocator(fRm))); } } diff --git a/utils/windowfunction/idborderby.h b/utils/windowfunction/idborderby.h index 0babe962b..3c0893d11 100644 --- a/utils/windowfunction/idborderby.h +++ b/utils/windowfunction/idborderby.h @@ -36,7 +36,7 @@ #include "resourcemanager.h" #include "rowgroup.h" #include "hasher.h" -// #include "stlpoolallocator.h" +#include "stlpoolallocator.h" // forward reference namespace joblist @@ -81,6 +81,31 @@ class ReservablePQ : private std::priority_queue<_Tp, _Sequence, _Compare> using std::priority_queue<_Tp, _Sequence, _Compare>::empty; }; +// template , +// typename _Compare = std::less > +// class ReservablePQ : private std::priority_queue<_Tp, _Sequence, _Compare> +// { +// public: +// typedef typename std::priority_queue<_Tp, _Sequence, _Compare>::size_type size_type; +// explicit ReservablePQ(size_type capacity = 0) +// { +// reserve(capacity); +// }; +// void reserve(size_type capacity) +// { +// this->c.reserve(capacity); +// } +// size_type capacity() const +// { +// return this->c.capacity(); +// } +// using std::priority_queue<_Tp, _Sequence, _Compare>::size; +// using std::priority_queue<_Tp, _Sequence, _Compare>::top; +// using std::priority_queue<_Tp, _Sequence, _Compare>::pop; +// using std::priority_queue<_Tp, _Sequence, _Compare>::push; +// using std::priority_queue<_Tp, _Sequence, _Compare>::empty; +// }; + // forward reference class IdbCompare; class OrderByRow; @@ -484,7 +509,7 @@ class IdbOrderBy : public IdbCompare }; using DistinctMap_t = std::unordered_set>; + utils::STLPoolAllocator>; boost::scoped_ptr fDistinctMap; rowgroup::Row row1, row2; // scratch space for Hasher & Eq