From 47d01b2d2fb291c6feb99fc14eef728758555bc5 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Wed, 28 Aug 2024 18:18:57 +0100 Subject: [PATCH] fix(join, UM, perf): UM join is multi-threaded now (#3286) * chore: UM join is multi-threaded now * fix(UMjoin): replace TR1 maps with stdlib versions --- dbcon/joblist/tuplehashjoin.cpp | 58 +++++++++++++++------------------ dbcon/joblist/tuplehashjoin.h | 49 ++++++++++++++-------------- utils/joiner/tuplejoiner.cpp | 27 ++++++++------- utils/joiner/tuplejoiner.h | 15 ++++----- 4 files changed, 75 insertions(+), 74 deletions(-) diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 3428756ed..e6c6b9796 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -29,7 +29,7 @@ #include #include #include -//#define NDEBUG +// #define NDEBUG #include #include using namespace std; @@ -119,12 +119,9 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) allowDJS = false; numCores = resourceManager->numCores(); + if (numCores <= 0) numCores = 8; - /* Debugging, rand() is used to simulate failures - time_t t = time(NULL); - srand(t); - */ } TupleHashJoinStep::~TupleHashJoinStep() @@ -139,8 +136,8 @@ TupleHashJoinStep::~TupleHashJoinStep() for (uint i = 0; i < smallDLs.size(); i++) { if (memUsedByEachJoin[i]) - resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); - } + resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); + } } returnMemory(); // cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl; @@ -207,7 +204,6 @@ void TupleHashJoinStep::join() jobstepThreadPool.join(djsReader); jobstepThreadPool.join(djsRelay); - // cout << "THJS: joined all DJS threads, shared usage = " << *djsSmallUsage << endl; } } @@ -228,7 +224,7 @@ void TupleHashJoinStep::trackMem(uint index) { gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, true); if (gotMem) - atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore); + atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore); else return; @@ -245,7 +241,7 @@ void TupleHashJoinStep::trackMem(uint index) gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, true); if (gotMem) { - atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore); + atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore); } else { @@ -305,28 +301,32 @@ void TupleHashJoinStep::startSmallRunners(uint index) handle abort, out of memory, etc */ - /* To measure wall-time spent constructing the small-side tables... - boost::posix_time::ptime end_time, start_time = - boost::posix_time::microsec_clock::universal_time(); - */ - stopMemTracking = false; utils::VLArray jobs(numCores); uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); }); // starting 1 thread when in PM mode, since it's only inserting into a // vector of rows. The rest will be started when converted to UM mode. if (joiner->inUM()) + { for (int i = 0; i < numCores; i++) + { jobs[i] = jobstepThreadPool.invoke([this, i, index, &jobs] { this->smallRunnerFcn(index, i, jobs); }); + } + } else + { jobs[0] = jobstepThreadPool.invoke([this, index, &jobs] { this->smallRunnerFcn(index, 0, jobs); }); + } // wait for the first thread to join, then decide whether the others exist and need joining jobstepThreadPool.join(jobs[0]); if (joiner->inUM()) + { for (int i = 1; i < numCores; i++) + { jobstepThreadPool.join(jobs[i]); - + } + } // stop the monitor thread memTrackMutex.lock(); stopMemTracking = true; @@ -435,7 +435,7 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, true); if (gotMem) { - atomicops::atomicAdd(&memUsedByEachJoin[index], rgSize); + atomicops::atomicAdd(&memUsedByEachJoin[index], rgSize); } else { @@ -468,9 +468,12 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit)) { joiner->setInUM(rgData[index]); + for (int i = 1; i < numCores; i++) + { jobs[i] = jobstepThreadPool.invoke([this, i, index, jobs] { this->smallRunnerFcn(index, i, jobs); }); + } } next: dlMutex.lock(); @@ -1727,8 +1730,8 @@ void TupleHashJoinStep::joinOneRG( std::shared_ptr& smallRowTemplates, RowGroupDL* outputDL, // disk-join support vars. This param list is insane; refactor attempt would be nice at some point. vector >* tjoiners, - std::shared_ptr[] >* rgMappings, - std::shared_ptr[] >* feMappings, + std::shared_ptr[]>* rgMappings, + std::shared_ptr[]>* feMappings, boost::scoped_array >* smallNullMem) { /* Disk-join support. @@ -1765,15 +1768,7 @@ void TupleHashJoinStep::joinOneRG( for (j = 0; j < smallSideCount; j++) { (*tjoiners)[j]->match(largeSideRow, k, threadID, &joinMatches[j]); - /* Debugging code to print the matches - Row r; - smallRGs[j].initRow(&r); - cout << joinMatches[j].size() << " matches: \n"; - for (uint32_t z = 0; z < joinMatches[j].size(); z++) { - r.setData(joinMatches[j][z]); - cout << " " << r.toString() << endl; - } - */ + matchCount = joinMatches[j].size(); if ((*tjoiners)[j]->hasFEFilter() && matchCount > 0) @@ -1859,10 +1854,11 @@ void TupleHashJoinStep::joinOneRG( } void TupleHashJoinStep::generateJoinResultSet(const vector >& joinerOutput, Row& baseRow, - const std::shared_ptr[] >& mappings, + const std::shared_ptr[]>& mappings, const uint32_t depth, RowGroup& l_outputRG, RGData& rgData, - vector& outputData, const std::shared_ptr& smallRows, - Row& joinedRow, RowGroupDL* dlp) + vector& outputData, + const std::shared_ptr& smallRows, Row& joinedRow, + RowGroupDL* dlp) { uint32_t i; Row& smallRow = smallRows[depth]; diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index 519beba6f..1636ccd18 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -74,8 +74,10 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep void tableOid1(execplan::CalpontSystemCatalog::OID tableOid1) { fTableOID1 = tableOid1; - if (fTableOID1 < 3000) + if (fTableOID1 >= 1000 && fTableOID1 < 3000) + { numCores = 1; // syscat query, no need for more than 1 thread + } } void tableOid2(execplan::CalpontSystemCatalog::OID tableOid2) { @@ -199,16 +201,16 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep void addSmallSideRG(const std::vector& rgs, const std::vector& tableNames); void addJoinKeyIndex(const std::vector& jt, const std::vector& typeless, - const std::vector >& smallkeys, - const std::vector >& largekeys); + const std::vector>& smallkeys, + const std::vector>& largekeys); void configSmallSideRG(const std::vector& rgs, const std::vector& tableNames); void configLargeSideRG(const rowgroup::RowGroup& rg); void configJoinKeyIndex(const std::vector& jt, const std::vector& typeless, - const std::vector >& smallkeys, - const std::vector >& largekeys); + const std::vector>& smallkeys, + const std::vector>& largekeys); void setOutputRowGroup(const rowgroup::RowGroup& rg); @@ -234,11 +236,11 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep { return smallSideKeys[0][0]; } - const std::vector >& getSmallKeys() const + const std::vector>& getSmallKeys() const { return smallSideKeys; } - const std::vector >& getLargeKeys() const + const std::vector>& getLargeKeys() const { return largeSideKeys; } @@ -434,8 +436,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep int fCorrelatedSide; std::vector typelessJoin; // the size of the vector is # of small side - std::vector > largeSideKeys; - std::vector > smallSideKeys; + std::vector> largeSideKeys; + std::vector> smallSideKeys; ResourceManager* resourceManager; uint64_t fMemSizeForOutputRG; @@ -448,8 +450,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep return *j1 < *j2; } }; - std::vector > joiners; - boost::scoped_array > rgData; + std::vector> joiners; + boost::scoped_array> rgData; TupleBPS* largeBPS; rowgroup::RowGroup largeRG, outputRG; std::vector smallRGs; @@ -511,7 +513,7 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep /* Semi-join support */ std::vector feIndexes; - std::vector > fe; + std::vector> fe; rowgroup::RowGroup joinFilterRG; /* Casual Partitioning forwarding */ @@ -534,10 +536,10 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep }; void joinRunnerFcn(uint32_t index); void startJoinThreads(); - void generateJoinResultSet(const std::vector >& joinerOutput, + void generateJoinResultSet(const std::vector>& joinerOutput, rowgroup::Row& baseRow, - const std::shared_ptr[] >& mappings, - const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, + const std::shared_ptr[]>& mappings, const uint32_t depth, + rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, std::vector& outputData, const std::shared_ptr& smallRows, rowgroup::Row& joinedRow, RowGroupDL* outputDL); @@ -549,11 +551,11 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep void joinOneRG(uint32_t threadID, std::vector& out, rowgroup::RowGroup& inputRG, rowgroup::RowGroup& joinOutput, rowgroup::Row& largeSideRow, rowgroup::Row& joinFERow, rowgroup::Row& joinedRow, rowgroup::Row& baseRow, - std::vector >& joinMatches, + std::vector>& joinMatches, std::shared_ptr& smallRowTemplates, RowGroupDL* outputDL, - std::vector >* joiners = NULL, - std::shared_ptr[] >* rgMappings = NULL, - std::shared_ptr[] >* feMappings = NULL, + std::vector>* joiners = NULL, + std::shared_ptr[]>* rgMappings = NULL, + std::shared_ptr[]>* feMappings = NULL, boost::scoped_array>* smallNullMem = NULL); void finishSmallOuterJoin(); void makeDupList(const rowgroup::RowGroup& rg); @@ -564,10 +566,10 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep std::shared_ptr[]> columnMappings, fergMappings; std::shared_ptr fe2Mapping; uint32_t joinThreadCount; - boost::scoped_array > smallNullMemory; + boost::scoped_array> smallNullMemory; uint64_t outputIt; bool moreInput; - std::vector > dupList; + std::vector> dupList; boost::scoped_array dupRows; std::vector smallTableNames; bool isExeMgr; @@ -633,8 +635,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep bool ownsOutputDL; void segregateJoiners(); - std::vector > tbpsJoiners; - std::vector > djsJoiners; + std::vector> tbpsJoiners; + std::vector> djsJoiners; std::vector djsJoinerMap; boost::scoped_array memUsedByEachJoin; boost::mutex djsLock; @@ -653,4 +655,3 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep }; } // namespace joblist - diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index 3dea44b56..0db8f88db 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -20,12 +20,13 @@ #include #include #include -#include +#include #include "hasher.h" #include "lbidlist.h" #include "spinlock.h" #include "vlarray.h" +#include "threadnaming.h" using namespace std; using namespace rowgroup; @@ -282,8 +283,7 @@ void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t* tables) done = false; continue; } - for (auto& element : buckets[i]) - tables[i]->insert(element); + tables[i]->insert(buckets[i].begin(), buckets[i].end()); m_bucketLocks[i].unlock(); wasProductive = true; buckets[i].clear(); @@ -306,7 +306,7 @@ void TupleJoiner::um_insertTypeless(uint threadID, uint rowCount, Row& r) if (td[i].len == 0) continue; uint bucket = bucketPicker((char*)td[i].data, td[i].len, bpSeed) & bucketMask; - v[bucket].push_back(pair(td[i], r.getPointer())); + v[bucket].emplace_back(pair(td[i], r.getPointer())); } bucketsToTables(&v[0], ht.get()); } @@ -323,9 +323,9 @@ void TupleJoiner::um_insertLongDouble(uint rowCount, Row& r) uint bucket = bucketPicker((char*)&smallKey, 10, bpSeed) & bucketMask; // change if we decide to support windows again if (UNLIKELY(smallKey == joblist::LONGDOUBLENULL)) - v[bucket].push_back(pair(joblist::LONGDOUBLENULL, r.getPointer())); + v[bucket].emplace_back(pair(joblist::LONGDOUBLENULL, r.getPointer())); else - v[bucket].push_back(pair(smallKey, r.getPointer())); + v[bucket].emplace_back(pair(smallKey, r.getPointer())); } bucketsToTables(&v[0], ld.get()); } @@ -345,9 +345,9 @@ void TupleJoiner::um_insertInlineRows(uint rowCount, Row& r) smallKey = (int64_t)r.getUintField(smallKeyColumn); uint bucket = bucketPicker((char*)&smallKey, sizeof(smallKey), bpSeed) & bucketMask; if (UNLIKELY(smallKey == nullValueForJoinColumn)) - v[bucket].push_back(pair(getJoinNullValue(), r.getData())); + v[bucket].emplace_back(pair(getJoinNullValue(), r.getData())); else - v[bucket].push_back(pair(smallKey, r.getData())); + v[bucket].emplace_back(pair(smallKey, r.getData())); } bucketsToTables(&v[0], h.get()); } @@ -367,9 +367,9 @@ void TupleJoiner::um_insertStringTable(uint rowCount, Row& r) smallKey = (int64_t)r.getUintField(smallKeyColumn); uint bucket = bucketPicker((char*)&smallKey, sizeof(smallKey), bpSeed) & bucketMask; if (UNLIKELY(smallKey == nullValueForJoinColumn)) - v[bucket].push_back(pair(getJoinNullValue(), r.getPointer())); + v[bucket].emplace_back(pair(getJoinNullValue(), r.getPointer())); else - v[bucket].push_back(pair(smallKey, r.getPointer())); + v[bucket].emplace_back(pair(smallKey, r.getPointer())); } bucketsToTables(&v[0], sth.get()); } @@ -670,6 +670,8 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin } } +using unordered_set_int128 = std::unordered_set; + void TupleJoiner::doneInserting() { // a minor textual cleanup @@ -694,7 +696,6 @@ void TupleJoiner::doneInserting() for (col = 0; col < smallKeyColumns.size(); col++) { - typedef std::tr1::unordered_set unordered_set_int128; unordered_set_int128 uniquer; unordered_set_int128::iterator uit; sthash_t::iterator sthit; @@ -811,6 +812,8 @@ void TupleJoiner::setInPM() void TupleJoiner::umJoinConvert(size_t begin, size_t end) { + utils::setThreadName("TJUMJoinConvert1"); + Row smallRow; smallRG.initRow(&smallRow); @@ -862,6 +865,8 @@ void TupleJoiner::setInUM() void TupleJoiner::umJoinConvert(uint threadID, vector& rgs, size_t begin, size_t end) { + utils::setThreadName("TJUMJoinConvert2"); + RowGroup l_smallRG(smallRG); while (begin < end) diff --git a/utils/joiner/tuplejoiner.h b/utils/joiner/tuplejoiner.h index f97045351..aa4aaf647 100644 --- a/utils/joiner/tuplejoiner.h +++ b/utils/joiner/tuplejoiner.h @@ -24,7 +24,7 @@ #include #include -#include +#include #include "rowgroup.h" #include "joiner.h" @@ -464,19 +464,18 @@ class TupleJoiner void setConvertToDiskJoin(); private: - typedef std::tr1::unordered_multimap, - utils::STLPoolAllocator > > + typedef std::unordered_multimap, + utils::STLPoolAllocator > > hash_t; - typedef std::tr1::unordered_multimap< - int64_t, rowgroup::Row::Pointer, hasher, std::equal_to, - utils::STLPoolAllocator > > + typedef std::unordered_multimap, + utils::STLPoolAllocator > > sthash_t; - typedef std::tr1::unordered_multimap< + typedef std::unordered_multimap< TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to, utils::STLPoolAllocator > > typelesshash_t; // MCOL-1822 Add support for Long Double AVG/SUM small side - typedef std::tr1::unordered_multimap< + typedef std::unordered_multimap< long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, utils::STLPoolAllocator > > ldhash_t;