diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 6471a2d58..93e61faef 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -1397,7 +1397,7 @@ bool BatchPrimitiveProcessorJL::pickNextJoinerNum() for (i = 0; i < PMJoinerCount; i++) { joinerNum = (joinerNum + 1) % PMJoinerCount; - if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide()->size()) + if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide().size()) break; } if (i == PMJoinerCount) @@ -1410,10 +1410,9 @@ bool BatchPrimitiveProcessorJL::pickNextJoinerNum() /* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) { - uint32_t size = 0, toSend, i, j; + uint32_t toSend, i, j; ISMPacketHeader ism; Row r; - vector* tSmallSide; joiner::TypelessData tlData; uint32_t smallKeyCol; uint32_t largeKeyCol; @@ -1436,8 +1435,8 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) } memset((void*)&ism, 0, sizeof(ism)); - tSmallSide = tJoiners[joinerNum]->getSmallSide(); - size = tSmallSide->size(); + auto& tSmallSide = tJoiners[joinerNum]->getSmallSide(); + auto size = tSmallSide.size(); #if 0 if (joinerNum == PMJoinerCount - 1 && pos == size) @@ -1487,7 +1486,7 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) for (i = pos; i < pos + toSend; i++) { - r.setPointer((*tSmallSide)[i]); + r.setPointer(tSmallSide[i]); isNull = false; bSignedUnsigned = tJoiners[joinerNum]->isSignedUnsignedJoin(); @@ -1554,7 +1553,7 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) for (i = pos, j = 0; i < pos + toSend; ++i, ++j) { - r.setPointer((*tSmallSide)[i]); + r.setPointer(tSmallSide[i]); if (r.getColType(smallKeyCol) == CalpontSystemCatalog::LONGDOUBLE) { @@ -1627,7 +1626,7 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) for (i = pos; i < pos + toSend; i++, tmpRow.nextRow()) { - r.setPointer((*tSmallSide)[i]); + r.setPointer(tSmallSide[i]); copyRow(r, &tmpRow); } diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index d5fbf133e..cdb7b0461 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -207,60 +207,6 @@ void TupleHashJoinStep::join() } } -// simple sol'n. Poll mem usage of Joiner once per second. Request mem -// increase after the fact. Failure to get mem will be detected and handled by -// the threads inserting into Joiner. -void TupleHashJoinStep::trackMem(uint index) -{ - auto joiner = joiners[index]; - ssize_t memBefore = 0, memAfter = 0; - bool gotMem; - - boost::unique_lock scoped(memTrackMutex); - while (!stopMemTracking) - { - memAfter = joiner->getMemUsage(); - if (memAfter != memBefore) - { - gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, true); - if (gotMem) - atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore); - else - return; - - memBefore = memAfter; - } - memTrackDone.timed_wait(scoped, boost::posix_time::seconds(1)); - } - - // one more iteration to capture mem usage since last poll, for this one - // raise an error if mem went over the limit - memAfter = joiner->getMemUsage(); - if (memAfter == memBefore) - return; - gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, true); - if (gotMem) - { - atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore); - } - else - { - if (!joinIsTooBig && - (isDML || !allowDJS || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000))) - { - joinIsTooBig = true; - ostringstream oss; - oss << "(" << __LINE__ << ") " - << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); - fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); - errorMessage(oss.str()); - status(logging::ERR_JOIN_TOO_BIG); - cout << "Join is too big, raise the UM join limit for now (monitor thread)" << endl; - abort(); - } - } -} - void TupleHashJoinStep::startSmallRunners(uint index) { utils::setThreadName("HJSStartSmall"); @@ -302,7 +248,6 @@ void TupleHashJoinStep::startSmallRunners(uint index) stopMemTracking = false; utils::VLArray jobs(numCores); - // uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); }); // starting 1 thread when in PM mode, since it's only inserting into a // vector of rows. The rest will be started when converted to UM mode. if (joiners[index]->inUM()) diff --git a/utils/common/fixedallocator.cpp b/utils/common/fixedallocator.cpp index 8d55ac7f8..cd91cc645 100644 --- a/utils/common/fixedallocator.cpp +++ b/utils/common/fixedallocator.cpp @@ -42,6 +42,7 @@ FixedAllocator::FixedAllocator(const FixedAllocator& f) currentlyStored = 0; useLock = f.useLock; lock = false; + alloc = f.alloc; } FixedAllocator& FixedAllocator::operator=(const FixedAllocator& f) @@ -51,6 +52,7 @@ FixedAllocator& FixedAllocator::operator=(const FixedAllocator& f) tmpSpace = f.tmpSpace; useLock = f.useLock; lock = false; + alloc = f.alloc; deallocateAll(); return *this; } diff --git a/utils/common/stlpoolallocator.h b/utils/common/stlpoolallocator.h index 8a10c6e93..851d7af7d 100644 --- a/utils/common/stlpoolallocator.h +++ b/utils/common/stlpoolallocator.h @@ -116,12 +116,6 @@ STLPoolAllocator::STLPoolAllocator(const STLPoolAllocator& s) throw() pa = s.pa; } -template -STLPoolAllocator::STLPoolAllocator(uint32_t capacity) throw() -{ - pa.reset(new PoolAllocator(capacity)); -} - template template STLPoolAllocator::STLPoolAllocator(const STLPoolAllocator& s) throw() @@ -134,17 +128,6 @@ STLPoolAllocator::~STLPoolAllocator() { } -template -void STLPoolAllocator::usePoolAllocator(boost::shared_ptr p) -{ - pa = p; -} -template -boost::shared_ptr STLPoolAllocator::getPoolAllocator() -{ - return pa; -} - template typename STLPoolAllocator::pointer STLPoolAllocator::allocate( typename STLPoolAllocator::size_type s, typename STLPoolAllocator::const_pointer hint) diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index c76bcfc0a..d4e52d64a 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -58,43 +58,34 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R { uint i; + auto alloc = resourceManager_->getAllocator(); + rows.reset(new RowPointersVec(alloc)); + getBucketCount(); m_bucketLocks.reset(new boost::mutex[bucketCount]); if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE) { - ld.reset(new boost::scoped_ptr[bucketCount]); - // _pool.reset(new boost::shared_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - // STLPoolAllocator> alloc(resourceManager_); - // _pool[i] = alloc.getPoolAllocator(); auto alloc = resourceManager_->getAllocator>(); - ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)); + ld.emplace_back(std::unique_ptr(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc))); } } else if (smallRG.usesStringTable()) { - sth.reset(new boost::scoped_ptr[bucketCount]); - _pool.reset(new boost::shared_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - // STLPoolAllocator> alloc(resourceManager_); - // _pool[i] = alloc.getPoolAllocator(); auto alloc = resourceManager_->getAllocator>(); - sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); + sth.emplace_back(std::unique_ptr(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc))); } } else { - h.reset(new boost::scoped_ptr[bucketCount]); - _pool.reset(new boost::shared_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - // STLPoolAllocator> alloc(resourceManager_); - // _pool[i] = alloc.getPoolAllocator(); auto alloc = resourceManager_->getAllocator>(); - h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); + h.emplace_back(std::unique_ptr(new hash_t(10, hasher(), hash_t::key_equal(), alloc))); } } @@ -149,13 +140,6 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R nullValueForJoinColumn = smallNullRow.getSignedNullValue(smallJoinColumn); } -// TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, -// const vector& smallJoinColumns, const vector& largeJoinColumns, -// JoinType jt, threadpool::ThreadPool* jsThreadPool) -// : TupleJoiner(smallInput, largeInput, smallJoinColumns, largeJoinColumns, jt, jsThreadPool, nullptr) -// { -// } - // Typeless joiner ctor TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, const vector& smallJoinColumns, const vector& largeJoinColumns, @@ -180,14 +164,11 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R getBucketCount(); - _pool.reset(new boost::shared_ptr[bucketCount]); - ht.reset(new boost::scoped_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - // STLPoolAllocator> alloc(resourceManager_); - // _pool[i] = alloc.getPoolAllocator(); auto alloc = resourceManager_->getAllocator>(); - ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); + ht.emplace_back(std::unique_ptr( + new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc))); } m_bucketLocks.reset(new boost::mutex[bucketCount]); @@ -239,11 +220,10 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R // note, 'numcores' is implied by tuplehashjoin on calls to insertRGData(). // TODO: make it explicit to avoid future confusion. - storedKeyAlloc.reset(new FixedAllocator[numCores]); for (i = 0; i < (uint)numCores; i++) { auto alloc = resourceManager_->getAllocator(); - storedKeyAlloc[i] = FixedAllocator(alloc, keyLength); + storedKeyAlloc.emplace_back(FixedAllocator(alloc, keyLength)); } } @@ -279,7 +259,7 @@ void TupleJoiner::getBucketCount() } template -void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t* tables) +void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t& tables) { uint i; @@ -301,7 +281,7 @@ void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t* tables) } tables[i]->insert(buckets[i].begin(), buckets[i].end()); } - + wasProductive = true; buckets[i].clear(); } @@ -326,7 +306,7 @@ void TupleJoiner::um_insertTypeless(uint threadID, uint rowCount, Row& r) uint bucket = bucketPicker((char*)td[i].data, td[i].len, bpSeed) & bucketMask; v[bucket].emplace_back(pair(td[i], r.getPointer())); } - bucketsToTables(&v[0], ht.get()); + bucketsToTables(&v[0], ht); } void TupleJoiner::um_insertLongDouble(uint rowCount, Row& r) @@ -345,7 +325,7 @@ void TupleJoiner::um_insertLongDouble(uint rowCount, Row& r) else v[bucket].emplace_back(pair(smallKey, r.getPointer())); } - bucketsToTables(&v[0], ld.get()); + bucketsToTables(&v[0], ld); } void TupleJoiner::um_insertInlineRows(uint rowCount, Row& r) @@ -367,7 +347,7 @@ void TupleJoiner::um_insertInlineRows(uint rowCount, Row& r) else v[bucket].emplace_back(pair(smallKey, r.getData())); } - bucketsToTables(&v[0], h.get()); + bucketsToTables(&v[0], h); } void TupleJoiner::um_insertStringTable(uint rowCount, Row& r) @@ -389,7 +369,7 @@ void TupleJoiner::um_insertStringTable(uint rowCount, Row& r) else v[bucket].emplace_back(pair(smallKey, r.getPointer())); } - bucketsToTables(&v[0], sth.get()); + bucketsToTables(&v[0], sth); } void TupleJoiner::insertRGData(RowGroup& rg, uint threadID) @@ -409,7 +389,7 @@ void TupleJoiner::insertRGData(RowGroup& rg, uint threadID) r.zeroRid(); } } - + rg.getRow(0, &r); if (joinAlg == UM) @@ -427,7 +407,7 @@ void TupleJoiner::insertRGData(RowGroup& rg, uint threadID) { // while in PM-join mode, inserting is single-threaded for (i = 0; i < rowCount; i++, r.nextRow()) - rows.push_back(r.getPointer()); + rows->push_back(r.getPointer()); } } @@ -492,7 +472,7 @@ void TupleJoiner::insert(Row& r, bool zeroTheRid) } } else - rows.push_back(r.getPointer()); + rows->push_back(r.getPointer()); } void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uint32_t threadID, @@ -508,8 +488,8 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin uint32_t size = v.size(); for (i = 0; i < size; i++) - if (v[i] < rows.size()) - matches->push_back(rows[v[i]]); + if (v[i] < rows->size()) + matches->push_back((*rows)[v[i]]); if (UNLIKELY((semiJoin() || antiJoin()) && matches->size() == 0)) matches->push_back(smallNullRow.getPointer()); @@ -536,7 +516,7 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin for (; range.first != range.second; ++range.first) matches->push_back(range.first->second); } - else if (largeSideRow.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE && ld) + else if (largeSideRow.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE && !ld.empty()) { // This is a compare of two long double long double largeKey; @@ -572,7 +552,7 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin largeKey = largeSideRow.getIntField(largeKeyColumns[0]); } - if (ld) + if (!ld.empty()) { // Compare against long double long double ldKey = largeKey; @@ -619,7 +599,7 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin if (UNLIKELY(inUM() && (joinType & MATCHNULLS) && !isNull && !typelessJoin)) { - if (largeRG.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE && ld) + if (largeRG.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE && !ld.empty()) { uint bucket = bucketPicker((char*)&(joblist::LONGDOUBLENULL), sizeof(joblist::LONGDOUBLENULL), bpSeed) & bucketMask; @@ -749,7 +729,7 @@ void TupleJoiner::doneInserting() for (i = 0; i < rowCount; i++) { if (joinAlg == PM) - smallRow.setPointer(rows[pmpos++]); + smallRow.setPointer((*rows)[pmpos++]); else if (typelessJoin) { while (thit == ht[bucket]->end()) @@ -839,14 +819,13 @@ void TupleJoiner::umJoinConvert(size_t begin, size_t end) while (begin < end) { - smallRow.setPointer(rows[begin++]); + smallRow.setPointer((*rows)[begin++]); insert(smallRow); } } void TupleJoiner::setInUM() { - vector empty; Row smallRow; uint32_t i, size; @@ -854,7 +833,7 @@ void TupleJoiner::setInUM() return; joinAlg = UM; - size = rows.size(); + size = rows->size(); size_t chunkSize = ((size / numCores) + 1 < 50000 ? 50000 : (size / numCores) + 1); // don't start a thread to process < 50k rows @@ -872,17 +851,15 @@ void TupleJoiner::setInUM() #ifdef TJ_DEBUG cout << "done\n"; #endif - rows.swap(empty); + auto alloc = resourceManager_->getAllocator(); + rows.reset(new RowPointersVec(alloc)); if (typelessJoin) { - tmpKeyAlloc.reset(new FixedAllocator[threadCount]); - for (i = 0; i < threadCount; i++) { auto alloc = resourceManager_->getAllocator(); - tmpKeyAlloc[i] = FixedAllocator(alloc, keyLength, true); - + tmpKeyAlloc.emplace_back(FixedAllocator(alloc, keyLength, true)); } } } @@ -909,8 +886,8 @@ void TupleJoiner::setInUM(vector& rgs) return; { // don't need rows anymore, free the mem - vector empty; - rows.swap(empty); + auto alloc = resourceManager_->getAllocator(); + rows.reset(new RowPointersVec(alloc)); } joinAlg = UM; @@ -935,12 +912,10 @@ void TupleJoiner::setInUM(vector& rgs) if (typelessJoin) { - tmpKeyAlloc.reset(new FixedAllocator[threadCount]); - for (i = 0; i < threadCount; i++) { auto alloc = resourceManager_->getAllocator(); - tmpKeyAlloc[i] = FixedAllocator(alloc, keyLength, true); + tmpKeyAlloc.emplace_back(FixedAllocator(alloc, keyLength, true)); } } } @@ -958,9 +933,9 @@ void TupleJoiner::markMatches(uint32_t threadID, uint32_t rowCount) for (i = 0; i < rowCount; i++) for (j = 0; j < matches[i].size(); j++) { - if (matches[i][j] < rows.size()) + if (matches[i][j] < rows->size()) { - smallRow[threadID].setPointer(rows[matches[i][j]]); + smallRow[threadID].setPointer((*rows)[matches[i][j]]); smallRow[threadID].markRow(); } } @@ -994,12 +969,10 @@ void TupleJoiner::setThreadCount(uint32_t cnt) if (typelessJoin) { - tmpKeyAlloc.reset(new FixedAllocator[threadCount]); - for (uint32_t i = 0; i < threadCount; i++) { auto alloc = resourceManager_->getAllocator(); - tmpKeyAlloc[i] = FixedAllocator(alloc, keyLength, true); + tmpKeyAlloc.emplace_back(FixedAllocator(alloc, keyLength, true)); } } @@ -1023,14 +996,14 @@ void TupleJoiner::getUnmarkedRows(vector* out) { uint32_t i, size; - size = rows.size(); + size = rows->size(); for (i = 0; i < size; i++) { - smallR.setPointer(rows[i]); + smallR.setPointer((*rows)[i]); if (!smallR.isMarked()) - out->push_back(rows[i]); + out->push_back((*rows)[i]); } } else @@ -1090,28 +1063,6 @@ void TupleJoiner::getUnmarkedRows(vector* out) } } -uint64_t TupleJoiner::getMemUsage() const -{ - if (inUM() && typelessJoin) - { - size_t ret = 0; - for (uint i = 0; i < bucketCount; i++) - ret += _pool[i]->getMemUsage(); - for (int i = 0; i < numCores; i++) - ret += storedKeyAlloc[i].getMemUsage(); - return ret; - } - else if (inUM()) - { - size_t ret = 0; - for (uint i = 0; i < bucketCount; i++) - ret += _pool[i]->getMemUsage(); - return ret; - } - else - return (rows.size() * sizeof(Row::Pointer)); -} - void TupleJoiner::setFcnExpFilter(boost::shared_ptr pt) { fe = pt; @@ -1250,7 +1201,7 @@ size_t TupleJoiner::size() const return ret; } - return rows.size(); + return rows->size(); } class TypelessDataStringEncoder @@ -1821,20 +1772,9 @@ void TupleJoiner::setTableName(const string& tname) void TupleJoiner::clearData() { - _pool.reset(new boost::shared_ptr[bucketCount]); - if (typelessJoin) - ht.reset(new boost::scoped_ptr[bucketCount]); - else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE) - ld.reset(new boost::scoped_ptr[bucketCount]); - else if (smallRG.usesStringTable()) - sth.reset(new boost::scoped_ptr[bucketCount]); - else - h.reset(new boost::scoped_ptr[bucketCount]); - + // This loop calls dtors and deallocates mem. for (uint i = 0; i < bucketCount; i++) { - // STLPoolAllocator> alloc(resourceManager_); - // _pool[i] = alloc.getPoolAllocator(); auto alloc = resourceManager_->getAllocator>(); if (typelessJoin) ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); @@ -1843,11 +1783,13 @@ void TupleJoiner::clearData() else if (smallRG.usesStringTable()) sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); else + { h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); + } } - std::vector empty; - rows.swap(empty); + auto alloc = resourceManager_->getAllocator(); + rows.reset(new RowPointersVec(alloc)); finished = false; } @@ -1910,11 +1852,10 @@ std::shared_ptr TupleJoiner::copyForDiskJoin() if (typelessJoin) { - ret->storedKeyAlloc.reset(new FixedAllocator[numCores]); for (int i = 0; i < numCores; i++) { auto alloc = resourceManager_->getAllocator(); - storedKeyAlloc[i] = FixedAllocator(alloc, keyLength); + storedKeyAlloc.emplace_back(FixedAllocator(alloc, keyLength)); } } diff --git a/utils/joiner/tuplejoiner.h b/utils/joiner/tuplejoiner.h index c8cb8e498..d2ab743db 100644 --- a/utils/joiner/tuplejoiner.h +++ b/utils/joiner/tuplejoiner.h @@ -204,6 +204,9 @@ class TypelessDataStructure } }; +using RowPointersVec = + std::vector>; +using RowPointersVecUP = std::unique_ptr; class TupleJoiner { public: @@ -268,20 +271,12 @@ class TupleJoiner }; /* ctor to use for numeric join */ - // TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, - // uint32_t smallJoinColumn, uint32_t largeJoinColumn, joblist::JoinType jt, - // threadpool::ThreadPool* jsThreadPool); - TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, uint32_t smallJoinColumn, uint32_t largeJoinColumn, joblist::JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, const uint64_t numCores); /* ctor to use for string & compound join */ - // TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, - // const std::vector& smallJoinColumns, const std::vector& largeJoinColumns, - // joblist::JoinType jt, threadpool::ThreadPool* jsThreadPool); - - TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, + TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, const std::vector& smallJoinColumns, const std::vector& largeJoinColumns, joblist::JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, const uint64_t numCores); @@ -333,9 +328,9 @@ class TupleJoiner void setThreadCount(uint32_t cnt); void setPMJoinResults(std::shared_ptr[]>, uint32_t threadID); std::shared_ptr[]> getPMJoinArrays(uint32_t threadID); - std::vector* getSmallSide() + RowPointersVec& getSmallSide() { - return &rows; + return *rows; } inline bool smallOuterJoin() { @@ -381,8 +376,6 @@ class TupleJoiner /* To allow sorting */ bool operator<(const TupleJoiner&) const; - uint64_t getMemUsage() const; - /* Typeless join interface */ inline bool isTypelessJoin() { @@ -410,7 +403,7 @@ class TupleJoiner { return discreteValues; } - inline const boost::scoped_array >& getCPData() + inline const boost::scoped_array>& getCPData() { return cpValues; } @@ -478,37 +471,22 @@ class TupleJoiner } private: - // typedef std::unordered_multimap, - // utils::STLPoolAllocator > > - // hash_t; - // typedef std::unordered_multimap, - // utils::STLPoolAllocator > > - // sthash_t; - // typedef std::unordered_multimap< - // TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to, - // utils::STLPoolAllocator > > - // typelesshash_t; - // // MCOL-1822 Add support for Long Double AVG/SUM small side - // typedef std::unordered_multimap< - // long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, - // utils::STLPoolAllocator > > - // ldhash_t; - - typedef std::unordered_multimap, - allocators::CountingAllocator > > - hash_t; - typedef std::unordered_multimap, - allocators::CountingAllocator > > - sthash_t; - typedef std::unordered_multimap< - TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to, - allocators::CountingAllocator > > - typelesshash_t; - // MCOL-1822 Add support for Long Double AVG/SUM small side - typedef std::unordered_multimap< - long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, - allocators::CountingAllocator > > - ldhash_t; + typedef std::unordered_multimap, + allocators::CountingAllocator>> + hash_t; + typedef std::unordered_multimap< + int64_t, rowgroup::Row::Pointer, hasher, std::equal_to, + allocators::CountingAllocator>> + sthash_t; + typedef std::unordered_multimap< + TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to, + allocators::CountingAllocator>> + typelesshash_t; + // MCOL-1822 Add support for Long Double AVG/SUM small side + typedef std::unordered_multimap< + long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, + allocators::CountingAllocator>> + ldhash_t; typedef hash_t::iterator iterator; typedef typelesshash_t::iterator thIterator; @@ -521,11 +499,11 @@ class TupleJoiner rowgroup::RGData smallNullMemory; - boost::scoped_array > h; // used for UM joins on ints - boost::scoped_array > + std::vector> h; // used for UM joins on ints + std::vector> sth; // used for UM join on ints where the backing table uses a string table - boost::scoped_array > ld; // used for UM join on long double - std::vector rows; // used for PM join + std::vector> ld; // used for UM join on long double + RowPointersVecUP rows; // used for PM join /* This struct is rough. The BPP-JL stores the parsed results for the logical block being processed. There are X threads at once, so @@ -546,18 +524,16 @@ class TupleJoiner }; JoinAlg joinAlg; joblist::JoinType joinType; - // WIP - std::shared_ptr[]> _pool; // pools for the table and nodes uint32_t threadCount; std::string tableName; /* vars, & fcns for typeless join */ bool typelessJoin; std::vector smallKeyColumns, largeKeyColumns; - boost::scoped_array > ht; // used for UM join on strings + std::vector> ht; // used for UM join on strings uint32_t keyLength; - boost::scoped_array storedKeyAlloc; - boost::scoped_array tmpKeyAlloc; + std::vector storedKeyAlloc; + std::vector tmpKeyAlloc; bool bSignedUnsignedJoin; // Set if we have a signed vs unsigned compare in a join. When not set, we can // save checking for the signed bit. @@ -571,7 +547,7 @@ class TupleJoiner /* Runtime casual partitioning support */ void updateCPData(const rowgroup::Row& r); boost::scoped_array discreteValues; - boost::scoped_array > cpValues; // if !discreteValues, [0] has min, [1] has max + boost::scoped_array> cpValues; // if !discreteValues, [0] has min, [1] has max uint32_t uniqueLimit; bool finished; @@ -590,7 +566,7 @@ class TupleJoiner void um_insertStringTable(uint rowcount, rowgroup::Row& r); template - void bucketsToTables(buckets_t*, hash_table_t*); + void bucketsToTables(buckets_t*, hash_table_t&); bool _convertToDiskJoin; joblist::ResourceManager* resourceManager_ = nullptr;