diff --git a/dbcon/joblist/diskjoinstep.cpp b/dbcon/joblist/diskjoinstep.cpp index 50a50bacf..956aa56ee 100644 --- a/dbcon/joblist/diskjoinstep.cpp +++ b/dbcon/joblist/diskjoinstep.cpp @@ -98,7 +98,6 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep* t, int djsIndex, int joinIndex, bo { // drain inputs, close output smallReader(); // only small input is supplying input at this point - // largeReader(); outputDL->endOfInput(); closedOutput = true; } @@ -178,7 +177,6 @@ void DiskJoinStep::smallReader() memUsage = jp->insertSmallSideRGData(rgData); combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage); - // cout << "memusage = " << memUsage << " total = " << combinedMemUsage << endl; if (combinedMemUsage > smallLimit) { errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_DBJ_DISK_USAGE_LIMIT)); @@ -189,14 +187,11 @@ void DiskJoinStep::smallReader() } } - // cout << "(" << joinerIndex << ") read the small side data, combined mem usage= " << combinedMemUsage << - // " rowcount = " << rowCount << endl; - if (!cancelled()) + if (LIKELY(!cancelled())) { memUsage = jp->doneInsertingSmallData(); combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage); - // cout << "2memusage = " << memUsage << " total = " << combinedMemUsage << endl; if (combinedMemUsage > smallLimit) { errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_DBJ_DISK_USAGE_LIMIT)); @@ -226,7 +221,6 @@ void DiskJoinStep::largeReader() RowGroup l_largeRG = largeRG; largeIterationCount++; - // cout << "iteration " << largeIterationCount << " largeLimit=" << largeLimit << endl; try { @@ -237,7 +231,6 @@ void DiskJoinStep::largeReader() if (more) { l_largeRG.setData(&rgData); - // cout << "large side raw data: " << largeRG.toString() << endl; largeSize += jp->insertLargeSideRGData(rgData); } @@ -245,7 +238,6 @@ void DiskJoinStep::largeReader() jp->doneInsertingLargeData(); - // cout << "(" << joinerIndex << ") read the large side data rowcount = " << rowCount << endl; if (!more) lastLargeIteration = true; } @@ -257,29 +249,21 @@ void DiskJoinStep::largeReader() abort(); } - if (cancelled()) + if (UNLIKELY(cancelled())) while (more) more = largeDL->next(largeIt, &rgData); } -void DiskJoinStep::loadFcn() +void DiskJoinStep::loadFcn(const uint32_t threadID, const uint32_t smallSideSizeLimit, + const std::vector& joinPartitions) { boost::shared_ptr out; - std::vector joinPartitions; - // Collect all join partitions. - jp->collectJoinPartitions(joinPartitions); - -#ifdef DEBUG_DJS - cout << "Collected join partitions: " << endl; - for (uint32_t i = 0; i < joinPartitions.size(); ++i) - cout << joinPartitions[i]->getUniqueID() << ", "; - cout << endl; -#endif try { uint32_t partitionIndex = 0; bool partitionDone = true; + RowGroup& rowGroup = smallRG; // Iterate over partitions. while (partitionIndex < joinPartitions.size() && !cancelled()) @@ -294,7 +278,6 @@ void DiskJoinStep::loadFcn() while (true) { messageqcpp::ByteStream bs; - RowGroup rowGroup; RGData rgData; joinPartition->readByteStream(0, &bs); @@ -314,10 +297,10 @@ void DiskJoinStep::loadFcn() break; } - currentSize += rowGroup.getRowCount() * rowGroup.getColumnCount() * 64; + currentSize += rowGroup.getDataSize(); out->smallData.push_back(rgData); - if (currentSize > partitionSize) + if (currentSize > smallSideSizeLimit) { #ifdef DEBUG_DJS cout << "Memory limit exceeded for the partition: " << joinPartition->getUniqueID() << endl; @@ -339,7 +322,7 @@ void DiskJoinStep::loadFcn() // Initialize `LoaderOutput` and add it to `FIFO`. out->partitionID = joinPartition->getUniqueID(); out->jp = joinPartition; - loadFIFO->insert(out); + loadFIFO[threadID]->insert(out); // If this partition is done - take a next one. if (partitionDone) @@ -354,25 +337,24 @@ void DiskJoinStep::loadFcn() abort(); } - loadFIFO->endOfInput(); + loadFIFO[threadID]->endOfInput(); } -void DiskJoinStep::buildFcn() +void DiskJoinStep::buildFcn(const uint32_t threadID) { boost::shared_ptr in; boost::shared_ptr out; bool more = true; - int it = loadFIFO->getIterator(); + int it = loadFIFO[threadID]->getIterator(); int i, j; Row smallRow; RowGroup l_smallRG = smallRG; l_smallRG.initRow(&smallRow); - while (1) + while (true) { - // cout << "getting a partition from the loader" << endl; - more = loadFIFO->next(it, &in); + more = loadFIFO[threadID]->next(it, &in); if (!more || cancelled()) goto out; @@ -383,7 +365,6 @@ void DiskJoinStep::buildFcn() out->jp = in->jp; out->tupleJoiner = joiner->copyForDiskJoin(); - // cout << "building a tuplejoiner" << endl; for (i = 0; i < (int)in->smallData.size(); i++) { l_smallRG.setData(&in->smallData[i]); @@ -394,26 +375,25 @@ void DiskJoinStep::buildFcn() } out->tupleJoiner->doneInserting(); - buildFIFO->insert(out); + buildFIFO[threadID]->insert(out); } out: while (more) - more = loadFIFO->next(it, &in); + more = loadFIFO[threadID]->next(it, &in); - buildFIFO->endOfInput(); + buildFIFO[threadID]->endOfInput(); } -void DiskJoinStep::joinFcn() +void DiskJoinStep::joinFcn(const uint32_t threadID) { - /* This function mostly serves as an adapter between the - input data and the joinOneRG() fcn in THJS. */ - + // This function mostly serves as an adapter between the + // input data and the joinOneRG() fcn in THJS. boost::shared_ptr in; bool more = true; - int it = buildFIFO->getIterator(); - int i, j; + int it = buildFIFO[threadID]->getIterator(); + int i; vector joinResults; RowGroup l_largeRG = largeRG, l_smallRG = smallRG; RowGroup l_outputRG = outputRG; @@ -467,9 +447,9 @@ void DiskJoinStep::joinFcn() try { - while (1) + while (true) { - more = buildFIFO->next(it, &in); + more = buildFIFO[threadID]->next(it, &in); if (!more || cancelled()) goto out; @@ -485,12 +465,9 @@ void DiskJoinStep::joinFcn() joinMatches, smallRowTemplates, outputDL.get(), &joiners, &colMappings, &fergMappings, &smallNullMem); - for (j = 0; j < (int)joinResults.size(); j++) - { - // l_outputRG.setData(&joinResults[j]); - // cout << "got joined output " << l_outputRG.toString() << endl; - outputDL->insert(joinResults[j]); - } + if (joinResults.size()) + outputResult(joinResults); + thjs->returnMemory(); joinResults.clear(); largeData = in->jp->getNextLargeRGData(); @@ -565,15 +542,10 @@ void DiskJoinStep::joinFcn() outputRow.nextRow(); } - if (l_outputRG.getRowCount() > 0) - { - // cout << "inserting an rg with " << l_outputRG.getRowCount() << endl; - outputDL->insert(rgData); - } + if (l_outputRG.getRowCount()) + outputResult({rgData}); if (thjs) - { thjs->returnMemory(); - } } } } @@ -589,9 +561,9 @@ void DiskJoinStep::joinFcn() out: while (more) - more = buildFIFO->next(it, &in); + more = buildFIFO[threadID]->next(it, &in); - if (lastLargeIteration || cancelled()) + if (cancelled()) { reportStats(); outputDL->endOfInput(); @@ -599,39 +571,107 @@ out: } } +void DiskJoinStep::initializeFIFO(const uint32_t threadCount) +{ + loadFIFO.clear(); + buildFIFO.clear(); + + for (uint32_t i = 0; i < threadCount; ++i) + { + boost::shared_ptr lFIFO(new LoaderOutputFIFO(1, 1)); + boost::shared_ptr bFIFO(new BuilderOutputFIFO(1, 1)); + + loadFIFO.push_back(lFIFO); + buildFIFO.push_back(bFIFO); + } +} + +void DiskJoinStep::processJoinPartitions(const uint32_t threadID, const uint32_t smallSideSizeLimitPerThread, + const std::vector& joinPartitions) +{ + std::vector pipelineThreads; + pipelineThreads.reserve(3); + pipelineThreads.push_back( + jobstepThreadPool.invoke(Loader(this, threadID, smallSideSizeLimitPerThread, joinPartitions))); + pipelineThreads.push_back(jobstepThreadPool.invoke(Builder(this, threadID))); + pipelineThreads.push_back(jobstepThreadPool.invoke(Joiner(this, threadID))); + jobstepThreadPool.join(pipelineThreads); +} + +void DiskJoinStep::prepareJobs(const std::vector& joinPartitions, + JoinPartitionJobs& joinPartitionsJobs) +{ + const uint32_t issuedThreads = jobstepThreadPool.getIssuedThreads(); + const uint32_t maxNumOfThreads = jobstepThreadPool.getMaxThreads(); + const uint32_t numOfThreads = + std::min(std::min(maxNumOfJoinThreads, std::max(maxNumOfThreads - issuedThreads, (uint32_t)1)), + (uint32_t)joinPartitions.size()); + const uint32_t workSize = joinPartitions.size() / numOfThreads; + + uint32_t offset = 0; + joinPartitionsJobs.reserve(numOfThreads); + for (uint32_t threadNum = 0; threadNum < numOfThreads; ++threadNum, offset += workSize) + { + auto start = joinPartitions.begin() + offset; + auto end = start + workSize; + std::vector joinPartitionJob(start, end); + joinPartitionsJobs.push_back(std::move(joinPartitionJob)); + } + + for (uint32_t i = 0, e = joinPartitions.size() % numOfThreads; i < e; ++i, ++offset) + joinPartitionsJobs[i].push_back(joinPartitions[offset]); +} + +void DiskJoinStep::outputResult(const std::vector& result) +{ + std::lock_guard lk(outputMutex); + for (const auto &rgData : result) + outputDL->insert(rgData); +} + +void DiskJoinStep::spawnJobs(const std::vector>& joinPartitionsJobs, + const uint32_t smallSideSizeLimitPerThread) +{ + const uint32_t threadsCount = joinPartitionsJobs.size(); + std::vector processorThreadsId; + processorThreadsId.reserve(threadsCount); + for (uint32_t threadID = 0; threadID < threadsCount; ++threadID) + { + processorThreadsId.push_back(jobstepThreadPool.invoke( + JoinPartitionsProcessor(this, threadID, smallSideSizeLimitPerThread, joinPartitionsJobs[threadID]))); + } + + jobstepThreadPool.join(processorThreadsId); +} + void DiskJoinStep::mainRunner() { - /* - Read from smallDL, insert into small side - Read from largeDL, insert into large side - Start the processing threads - */ try { smallReader(); while (!lastLargeIteration && !cancelled()) { - // cout << "large iteration " << largeIterationCount << endl; jp->initForLargeSideFeed(); largeReader(); - // cout << "done reading iteration " << largeIterationCount-1 << endl; jp->initForProcessing(); - if (cancelled()) - break; + // Collect all join partitions. + std::vector joinPartitions; + jp->collectJoinPartitions(joinPartitions); - loadFIFO.reset( - new FIFO >(1, 1)); // double buffering should be good enough - buildFIFO.reset(new FIFO >(1, 1)); + // Split partitions for each threads. + JoinPartitionJobs joinPartitionsJobs; + prepareJobs(joinPartitions, joinPartitionsJobs); - std::vector thrds; - thrds.reserve(3); - thrds.push_back(jobstepThreadPool.invoke(Loader(this))); - thrds.push_back(jobstepThreadPool.invoke(Builder(this))); - thrds.push_back(jobstepThreadPool.invoke(Joiner(this))); - jobstepThreadPool.join(thrds); + // Initialize data lists. + const uint32_t numOfThreads = joinPartitionsJobs.size(); + initializeFIFO(numOfThreads); + + // Spawn jobs. + const uint32_t smallSideSizeLimitPerThread = partitionSize / numOfThreads; + spawnJobs(joinPartitionsJobs, smallSideSizeLimitPerThread); } } catch (...) @@ -643,7 +683,7 @@ void DiskJoinStep::mainRunner() } // make sure all inputs were drained & output closed - if (cancelled()) + if (UNLIKELY(cancelled())) { try { @@ -661,6 +701,12 @@ void DiskJoinStep::mainRunner() closedOutput = true; } } + + if (LIKELY(!closedOutput)) + { + outputDL->endOfInput(); + closedOutput = true; + } } const string DiskJoinStep::toString() const diff --git a/dbcon/joblist/diskjoinstep.h b/dbcon/joblist/diskjoinstep.h index e719acba1..55079e348 100644 --- a/dbcon/joblist/diskjoinstep.h +++ b/dbcon/joblist/diskjoinstep.h @@ -25,6 +25,8 @@ namespace joblist { +using JoinPartitionJobs = std::vector>; + class DiskJoinStep : public JobStep { public: @@ -44,6 +46,14 @@ class DiskJoinStep : public JobStep protected: private: + void initializeFIFO(uint32_t threadCount); + void processJoinPartitions(const uint32_t threadID, const uint32_t smallSideSizeLimitPerThread, + const vector& joinPartitions); + void prepareJobs(const std::vector& joinPartitions, + JoinPartitionJobs& joinPartitionsJobs); + void outputResult(const std::vector& result); + void spawnJobs(const std::vector>& joinPartitionsJobs, + const uint32_t smallSideSizeLimitPerThread); boost::shared_ptr jp; rowgroup::RowGroup largeRG, smallRG, outputRG, joinFERG; std::vector largeKeyCols, smallKeyCols; @@ -80,6 +90,26 @@ class DiskJoinStep : public JobStep uint64_t mainThread; // thread handle from thread pool + struct JoinPartitionsProcessor + { + JoinPartitionsProcessor(DiskJoinStep* djs, const uint32_t threadID, const uint32_t smallSideSizeLimit, + const std::vector& joinPartitions) + : djs(djs), threadID(threadID), smallSideSizeLimit(smallSideSizeLimit), joinPartitions(joinPartitions) + { + } + + void operator()() + { + utils::setThreadName("DJSJoinPartitionsProcessor"); + djs->processJoinPartitions(threadID, smallSideSizeLimit, joinPartitions); + } + + DiskJoinStep* djs; + uint32_t threadID; + uint32_t smallSideSizeLimit; + std::vector joinPartitions; + }; + /* Loader structs */ struct LoaderOutput { @@ -87,21 +117,30 @@ class DiskJoinStep : public JobStep uint64_t partitionID; joiner::JoinPartition* jp; }; - boost::shared_ptr > > loadFIFO; + + using LoaderOutputFIFO = joblist::FIFO>; + std::vector> loadFIFO; struct Loader { - Loader(DiskJoinStep* d) : djs(d) + Loader(DiskJoinStep* d, const uint32_t threadID, const uint32_t smallSideSizeLimit, + const std::vector& joinPartitions) + : djs(d), threadID(threadID), smallSideSizeLimit(smallSideSizeLimit), joinPartitions(joinPartitions) { } void operator()() { utils::setThreadName("DJSLoader"); - djs->loadFcn(); + djs->loadFcn(threadID, smallSideSizeLimit, joinPartitions); } + DiskJoinStep* djs; + uint32_t threadID; + uint32_t smallSideSizeLimit; + std::vector joinPartitions; }; - void loadFcn(); + void loadFcn(const uint32_t threadID, const uint32_t smallSideSizeLimit, + const std::vector& joinPartitions); /* Builder structs */ struct BuilderOutput @@ -112,36 +151,39 @@ class DiskJoinStep : public JobStep joiner::JoinPartition* jp; }; - boost::shared_ptr > > buildFIFO; + using BuilderOutputFIFO = joblist::FIFO>; + std::vector> buildFIFO; struct Builder { - Builder(DiskJoinStep* d) : djs(d) + Builder(DiskJoinStep* d, const uint32_t threadID) : djs(d), threadID(threadID) { } void operator()() { utils::setThreadName("DJSBuilder"); - djs->buildFcn(); + djs->buildFcn(threadID); } DiskJoinStep* djs; + uint32_t threadID; }; - void buildFcn(); + void buildFcn(const uint32_t threadID); /* Joining structs */ struct Joiner { - Joiner(DiskJoinStep* d) : djs(d) + Joiner(DiskJoinStep* d, const uint32_t threadID) : djs(d), threadID(threadID) { } void operator()() { utils::setThreadName("DJSJoiner"); - djs->joinFcn(); + djs->joinFcn(threadID); } DiskJoinStep* djs; + uint32_t threadID; }; - void joinFcn(); + void joinFcn(const uint32_t threadID); // limits & usage boost::shared_ptr smallUsage; @@ -154,6 +196,9 @@ class DiskJoinStep : public JobStep uint32_t joinerIndex; bool closedOutput; + + std::mutex outputMutex; + const uint32_t maxNumOfJoinThreads = 32; }; } // namespace joblist diff --git a/dbcon/joblist/jlf_tuplejoblist.cpp b/dbcon/joblist/jlf_tuplejoblist.cpp index 56a3ac2ee..6cdf66c1d 100644 --- a/dbcon/joblist/jlf_tuplejoblist.cpp +++ b/dbcon/joblist/jlf_tuplejoblist.cpp @@ -1838,10 +1838,10 @@ void CircularJoinGraphTransformer::chooseEdgeToTransform(Cycle& cycle, } if (jobInfo.trace) - std::cout << "FK FK key not found, removing the first one inner join edge" << std::endl; + std::cout << "FK FK key not found, removing the last one inner join edge" << std::endl; - // Take just a first. - resultEdge = std::make_pair(cycle.front(), 0 /*Dummy weight*/); + // Take just a last one. + resultEdge = std::make_pair(cycle.back(), 0 /*Dummy weight*/); } void CircularJoinGraphTransformer::removeAssociatedHashJoinStepFromJoinSteps(const JoinEdge& joinEdge) diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index e1c3e1443..3428756ed 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -200,10 +200,10 @@ void TupleHashJoinStep::join() joinRan = true; jobstepThreadPool.join(mainRunner); - if (djs) + if (djs.size()) { - for (int i = 0; i < (int)djsJoiners.size(); i++) - djs[i].join(); + for (auto& diskJoinStep : djs) + diskJoinStep->join(); jobstepThreadPool.join(djsReader); jobstepThreadPool.join(djsRelay); @@ -631,10 +631,10 @@ void TupleHashJoinStep::djsReaderFcn(int index) while (more) more = fifos[index]->next(it, &rgData); - for (int i = 0; i < (int)djsJoiners.size(); i++) + for (auto& diskJoinStep : djs) { - fExtendedInfo += djs[i].extendedInfo(); - fMiniInfo += djs[i].miniInfo(); + fExtendedInfo += diskJoinStep->extendedInfo(); + fMiniInfo += diskJoinStep->miniInfo(); } outputDL->endOfInput(); @@ -750,7 +750,6 @@ void TupleHashJoinStep::hjRunner() outputIt = outputDL->getIterator(); } - djs.reset(new DiskJoinStep[smallSideCount]); fifos.reset(new boost::shared_ptr[smallSideCount + 1]); for (i = 0; i <= smallSideCount; i++) @@ -762,7 +761,8 @@ void TupleHashJoinStep::hjRunner() { // these link themselves fifos[0]->DSJ[0]->fifos[1]->DSJ[1] ... ->fifos[smallSideCount], // THJS puts data into fifos[0], reads it from fifos[smallSideCount] - djs[i] = DiskJoinStep(this, i, djsJoinerMap[i], (i == smallSideCount - 1)); + djs.push_back(std::shared_ptr( + new DiskJoinStep(this, i, djsJoinerMap[i], (i == smallSideCount - 1)))); } sl.unlock(); @@ -774,7 +774,7 @@ void TupleHashJoinStep::hjRunner() vector empty; resourceManager->returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit); atomicops::atomicZero(&memUsedByEachJoin[i]); - djs[i].loadExistingData(rgData[djsJoinerMap[i]]); + djs[i]->loadExistingData(rgData[djsJoinerMap[i]]); rgData[djsJoinerMap[i]].swap(empty); } } @@ -801,7 +801,7 @@ void TupleHashJoinStep::hjRunner() reader = true; for (i = 0; i < smallSideCount; i++) - djs[i].run(); + djs[i]->run(); } catch (thread_resource_error&) { @@ -883,7 +883,7 @@ void TupleHashJoinStep::hjRunner() } // todo: forwardCPData needs to grab data from djs - if (!djs) + if (djs.empty()) forwardCPData(); // this fcn has its own exclusion list // decide if perform aggregation on PM @@ -933,7 +933,7 @@ void TupleHashJoinStep::hjRunner() { largeBPS->useJoiners(tbpsJoiners); - if (djs) + if (djs.size()) largeBPS->setJoinedResultRG(largeRG + outputRG); else largeBPS->setJoinedResultRG(outputRG); @@ -948,7 +948,7 @@ void TupleHashJoinStep::hjRunner() For now, the alg is "assume if any joins are done on the UM, fe2 has to go on the UM." The structs and logic aren't in place yet to track all of the tables through a joblist. */ - if (fe2 && !djs) + if (fe2 && !djs.size()) { /* Can't do a small outer join when the PM sends back joined rows */ runFE2onPM = true; @@ -974,7 +974,7 @@ void TupleHashJoinStep::hjRunner() else if (fe2) runFE2onPM = false; - if (!fDelivery && !djs) + if (!fDelivery && !djs.size()) { /* connect the largeBPS directly to the next step */ JobStepAssociation newJsa; @@ -993,7 +993,7 @@ void TupleHashJoinStep::hjRunner() // there are no in-mem UM or PM joins, only disk-joins startAdjoiningSteps(); } - else if (!djs) + else if (!djs.size()) // if there's no largeBPS, all joins are either done by DJS or join threads, // this clause starts the THJS join threads. startJoinThreads(); @@ -1026,7 +1026,7 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs) else deliveredRG = &outputRG; - if (largeBPS && !djs) + if (largeBPS && !djs.size()) { dl = largeDL; it = largeIt; @@ -2044,10 +2044,10 @@ void TupleHashJoinStep::abort() JobStep::abort(); boost::mutex::scoped_lock sl(djsLock); - if (djs) + if (djs.size()) { - for (uint32_t i = 0; i < djsJoiners.size(); i++) - djs[i].abort(); + for (uint32_t i = 0, e = djs.size(); e < i; i++) + djs[i]->abort(); } } diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index a1e2fb2a1..519beba6f 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -584,7 +584,7 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep std::set fFunctionJoinKeys; // for skipping CP forward /* Disk-based join support */ - boost::scoped_array djs; + std::vector> djs; boost::scoped_array > fifos; void djsReaderFcn(int index); uint64_t djsReader; // thread handle from thread pool diff --git a/utils/joiner/joinpartition.cpp b/utils/joiner/joinpartition.cpp index 7108a9407..6db3ade42 100644 --- a/utils/joiner/joinpartition.cpp +++ b/utils/joiner/joinpartition.cpp @@ -481,7 +481,8 @@ int64_t JoinPartition::processSmallBuffer(RGData& rgData) ret = writeByteStream(0, bs); - htSizeEstimate += rg.getRowCount() * rg.getColumnCount() * 64; + if (rg.getRowCount()) + htSizeEstimate += rg.getDataSize(); // Check whether this partition is now too big -> convert to split mode. if (htTargetSize < htSizeEstimate && canConvertToSplitMode()) ret += convertToSplitMode();