diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index c718deaa0..1bb79bfc0 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -68,6 +68,7 @@ using namespace oam; using namespace joblist; #include "atomicops.h" +#include "threadnaming.h" namespace { @@ -131,6 +132,7 @@ struct EngineCommRunner uint32_t connIndex; void operator()() { + utils::setThreadName("DECRunner"); // cout << "Listening on client at 0x" << hex << (ptrdiff_t)client << dec << endl; try { diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 13a44516e..4b94dd453 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -112,12 +112,12 @@ class pColStep : public JobStep * * Starts processing. Set at least the RID list before calling this. */ - virtual void run(){}; + virtual void run() {}; /** @brief Sync's the caller with the end of execution. * * Does nothing. Returns when this instance is finished. */ - virtual void join(){}; + virtual void join() {}; virtual const std::string toString() const; @@ -1459,6 +1459,8 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep void interleaveJobs(std::vector* jobs) const; void sendJobs(const std::vector& jobs); uint32_t numDBRoots; + // presumably there must be not more than 2^32 blocks per job as of 23.02. + uint32_t blocksPerJob; /* Pseudo column filter processing. Think about refactoring into a separate class. */ bool processPseudoColFilters(uint32_t extentIndex, boost::shared_ptr> dbRootPMMap) const; diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 6b1efc6c0..5696a1376 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -19,7 +19,7 @@ // $Id: tuple-bps.cpp 9705 2013-07-17 20:06:07Z pleblanc $ #include -//#define NDEBUG +// #define NDEBUG #include #include #include @@ -77,7 +77,9 @@ using namespace querytele; #include "columnwidth.h" #include "pseudocolumn.h" -//#define DEBUG 1 +// #define DEBUG 1 + +// #include "poormanprofiler.inc" extern boost::mutex fileLock_g; @@ -396,15 +398,6 @@ void TupleBPS::initializeConfigParms() { string strVal; - //...Get the tuning parameters that throttle msgs sent to primproc - //...fFilterRowReqLimit puts a cap on how many rids we will request from - //... primproc, before pausing to let the consumer thread catch up. - //... Without this limit, there is a chance that PrimProc could flood - //... ExeMgr with thousands of messages that will consume massive - //... amounts of memory for a 100 gigabyte database. - //...fFilterRowReqThreshold is the level at which the number of outstanding - //... rids must fall below, before the producer can send more rids. - // These could go in constructor fRequestSize = fRm->getJlRequestSize(); fMaxOutstandingRequests = fRm->getJlMaxOutstandingRequests(); @@ -556,14 +549,14 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : BatchPrimi throw runtime_error(oss.str()); } - catch(std::exception& ex) + catch (std::exception& ex) { std::ostringstream oss; oss << "Error getting AUX column OID for table " << tableName.toString(); oss << " due to: " << ex.what(); throw runtime_error(oss.str()); } - catch(...) + catch (...) { std::ostringstream oss; oss << "Error getting AUX column OID for table " << tableName.toString(); @@ -1684,7 +1677,8 @@ void TupleBPS::sendJobs(const vector& jobs) if (recvWaiting) condvar.notify_all(); - while ((msgsSent - msgsRecvd > fMaxOutstandingRequests << LOGICAL_EXTENT_CONVERTER) && !fDie) + // Send not more than fMaxOutstandingRequests jobs out. min(blocksPerJob) = 16 + while ((msgsSent - msgsRecvd > fMaxOutstandingRequests * (blocksPerJob >> 1)) && !fDie) { sendWaiting = true; condvarWakeupProducer.wait(tplLock); @@ -2007,7 +2001,6 @@ void TupleBPS::makeJobs(vector* jobs) uint32_t i; uint32_t lbidsToScan; uint32_t blocksToScan; - uint32_t blocksPerJob; LBID_t startingLBID; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); boost::shared_ptr> dbRootConnectionMap = oamCache->getDBRootToConnectionMap(); @@ -2227,6 +2220,8 @@ void TupleBPS::processByteStreamVector(vectorpmSendsFinalResult()) { + utils::setThreadName("BSPJoin"); + data->joinedData = RGData(data->local_outputRG); data->local_outputRG.setData(&data->joinedData); data->local_outputRG.resetRowGroup(data->local_primRG.getBaseRid()); @@ -2340,6 +2335,8 @@ void TupleBPS::processByteStreamVector(vectorjoinedData); } + + utils::setThreadName("ByteStreamProcessor"); } else { @@ -2351,6 +2348,7 @@ void TupleBPS::processByteStreamVector(vector 0 && !cancelled()) { @@ -2358,6 +2356,8 @@ void TupleBPS::processByteStreamVector(vectorlocal_fe2Output, dlp); } + utils::setThreadName("ByteStreamProcessor"); + data->cachedIO_Thread += cachedIO; data->physIO_Thread += physIO; data->touchedBlocks_Thread += touchedBlocks; @@ -2777,8 +2777,7 @@ void TupleBPS::receiveMultiPrimitiveMessages() << totalBlockedReadCount << "/" << totalBlockedWriteCount << "; output size-" << ridsReturned << endl << "\tPartitionBlocksEliminated-" << fNumBlksSkipped << "; MsgBytesIn-" << msgBytesInKB << "KB" - << "; MsgBytesOut-" << msgBytesOutKB << "KB" - << "; TotalMsgs-" << totalMsgs << endl + << "; MsgBytesOut-" << msgBytesOutKB << "KB" << "; TotalMsgs-" << totalMsgs << endl << "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << "s\n\tUUID " << uuids::to_string(fStepUuid) << "\n\tQuery UUID " @@ -3179,9 +3178,8 @@ bool TupleBPS::deliverStringTableRowGroup() const void TupleBPS::formatMiniStats() { ostringstream oss; - oss << "BPS " - << "PM " << alias() << " " << fTableOid << " " << fBPP->toMiniString() << " " << fPhysicalIO << " " - << fCacheIO << " " << fNumBlksSkipped << " " + oss << "BPS " << "PM " << alias() << " " << fTableOid << " " << fBPP->toMiniString() << " " << fPhysicalIO + << " " << fCacheIO << " " << fNumBlksSkipped << " " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " " << ridsReturned << " "; diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index a62bac653..f0c9f3225 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -18,7 +18,7 @@ // $Id: tupleannexstep.cpp 9661 2013-07-01 20:33:05Z pleblanc $ -//#define NDEBUG +// #define NDEBUG #include #include #include @@ -251,10 +251,6 @@ void TupleAnnexStep::run() fRunnersList.resize(fMaxThreads); fInputIteratorsList.resize(fMaxThreads + 1); - // Activate stats collecting before CS spawns threads. - if (traceOn()) - dlTimes.setFirstReadTime(); - // *DRRTUY Make this block conditional StepTeleStats sts; sts.query_uuid = fQueryUuid; @@ -858,7 +854,7 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() break; } } // end of limit bound while loop - } // end of if-else + } // end of if-else if (fRowGroupOut.getRowCount() > 0) { @@ -1045,7 +1041,7 @@ void TupleAnnexStep::finalizeParallelOrderBy() break; } } // end of limit bound while loop - } // end of if-else + } // end of if-else if (fRowGroupOut.getRowCount() > 0) { @@ -1065,9 +1061,6 @@ void TupleAnnexStep::finalizeParallelOrderBy() if (traceOn()) { - if (dlTimes.FirstReadTime().tv_sec == 0) - dlTimes.setFirstReadTime(); - dlTimes.setLastReadTime(); dlTimes.setEndOfInputTime(); printCalTrace(); @@ -1102,6 +1095,13 @@ void TupleAnnexStep::executeParallelOrderBy(uint64_t id) try { more = fInputDL->next(fInputIteratorsList[id], &rgDataIn); + + // Stats collecting. + if (more && (id == 1) && traceOn()) + { + dlTimes.setFirstReadTime(); + } + if (more) dlOffset++; @@ -1241,14 +1241,9 @@ void TupleAnnexStep::formatMiniStats() { ostringstream oss; oss << "TNS "; - oss << "UM " - << "- " - << "- " - << "- " - << "- " - << "- " - << "- " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " " - << fRowsReturned << " "; + oss << "UM " << "- " << "- " << "- " << "- " << "- " << "- " + << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " " << fRowsReturned + << " "; fMiniInfo += oss.str(); } diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 6e6a60c3d..d935a67dc 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -278,12 +278,12 @@ void TupleHashJoinStep::startSmallRunners(uint index) if (typelessJoin[index]) { joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index], largeSideKeys[index], jt, - &jobstepThreadPool)); + &jobstepThreadPool, numCores)); } else { joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], largeSideKeys[index][0], - jt, &jobstepThreadPool)); + jt, &jobstepThreadPool, numCores)); } joiner->setUniqueLimit(uniqueLimit); @@ -1297,15 +1297,11 @@ void TupleHashJoinStep::formatMiniStats(uint32_t index) else oss << "- "; - oss << " " - << "- " - << "- " - << "- " + oss << " " << "- " << "- " << "- " << "- " // << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " " // dlTimes are not timed in this step, using '--------' instead. - << "-------- " - << "-\n"; + << "-------- " << "-\n"; fMiniInfo += oss.str(); } diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index 2901a87a6..0de06e10d 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -203,12 +203,11 @@ is 20 extents worth of work for the PMs to process at any given time. ProcessorThreadsPerScan * MaxOutstandingRequests should be at least as many threads are available across all PMs. --> - - - - 100 + + + 20000 + 1000 diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 57b8111c9..dfe1527a7 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -1387,6 +1387,7 @@ void BatchPrimitiveProcessor::execute() #ifdef PRIMPROC_STOPWATCH stopwatch->start("BatchPrimitiveProcessor::execute first part"); #endif + utils::setThreadName("BPPFilt&Pr"); // if only one scan step which has no predicate, async load all columns if (filterCount == 1 && hasScan) @@ -1550,6 +1551,8 @@ void BatchPrimitiveProcessor::execute() #endif outputRG.resetRowGroup(baseRid); + utils::setThreadName("BPPFE1_1"); + if (fe1) { uint32_t newRidCount = 0; @@ -1616,6 +1619,8 @@ void BatchPrimitiveProcessor::execute() } if (fe2) { + utils::setThreadName("BPPFE2_1"); + /* functionize this -> processFE2() */ fe2Output.resetRowGroup(baseRid); fe2Output.getRow(0, &fe2Out); @@ -1646,6 +1651,8 @@ void BatchPrimitiveProcessor::execute() if (fAggregator) { + utils::setThreadName("BPPAgg_1"); + *serialized << (uint8_t)1; // the "count this msg" var // see TupleBPS::setFcnExpGroup2() and where it gets called. @@ -1662,17 +1669,17 @@ void BatchPrimitiveProcessor::execute() if ((currentBlockOffset + 1) == count) // @bug4507, 8k { - fAggregator->loadResult(*serialized); // @bug4507, 8k - } // @bug4507, 8k + fAggregator->loadResult(*serialized); // @bug4507, 8k + } // @bug4507, 8k else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k { fAggregator->loadEmptySet(*serialized); // @bug4507, 8k - } // @bug4507, 8k - else // @bug4507, 8k + } // @bug4507, 8k + else // @bug4507, 8k { fAggregator->loadResult(*serialized); // @bug4507, 8k fAggregator->aggReset(); // @bug4507, 8k - } // @bug4507, 8k + } // @bug4507, 8k } if (!fAggregator && !fe2) @@ -1726,6 +1733,8 @@ void BatchPrimitiveProcessor::execute() do // while (startRid > 0) { + utils::setThreadName("BPPJoin_1"); + #ifdef PRIMPROC_STOPWATCH stopwatch->start("-- executeTupleJoin()"); startRid = executeTupleJoin(startRid, largeSideRowGroup); @@ -1777,6 +1786,8 @@ void BatchPrimitiveProcessor::execute() *serialized << sendCount; if (fe2) { + utils::setThreadName("BPPFE2_2"); + /* functionize this -> processFE2()*/ fe2Output.resetRowGroup(baseRid); fe2Output.setDBRoot(dbRoot); @@ -1800,21 +1811,23 @@ void BatchPrimitiveProcessor::execute() if (fAggregator) { + utils::setThreadName("BPPAgg_2"); + fAggregator->addRowGroup(&nextRG); if ((currentBlockOffset + 1) == count && moreRGs == false && startRid == 0) // @bug4507, 8k { - fAggregator->loadResult(*serialized); // @bug4507, 8k - } // @bug4507, 8k + fAggregator->loadResult(*serialized); // @bug4507, 8k + } // @bug4507, 8k else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k { fAggregator->loadEmptySet(*serialized); // @bug4507, 8k - } // @bug4507, 8k - else // @bug4507, 8k + } // @bug4507, 8k + else // @bug4507, 8k { fAggregator->loadResult(*serialized); // @bug4507, 8k fAggregator->aggReset(); // @bug4507, 8k - } // @bug4507, 8k + } // @bug4507, 8k } else { @@ -1901,6 +1914,7 @@ void BatchPrimitiveProcessor::execute() // cout << "sent physIO=" << physIO << " cachedIO=" << cachedIO << // " touchedBlocks=" << touchedBlocks << endl; } + utils::setThreadName("BPPExecuteEnd"); #ifdef PRIMPROC_STOPWATCH stopwatch->stop("BatchPrimitiveProcessor::execute fourth part"); @@ -2751,7 +2765,6 @@ void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount) if (rc == 0) for (i = 0; i < vssData.size(); i++) vssCache.insert(make_pair(lbidList[i], vssData[i])); - } } // namespace primitiveprocessor diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index bd7046efa..d7cea726f 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -39,7 +39,7 @@ namespace joiner // Typed joiner ctor TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt, - threadpool::ThreadPool* jsThreadPool) + threadpool::ThreadPool* jsThreadPool, const uint64_t numCores) : smallRG(smallInput) , largeRG(largeInput) , joinAlg(INSERTING) @@ -49,6 +49,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R , bSignedUnsignedJoin(false) , uniqueLimit(100) , finished(false) + , numCores(numCores) , jobstepThreadPool(jsThreadPool) , _convertToDiskJoin(false) { @@ -145,7 +146,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R // Typeless joiner ctor TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, const vector& smallJoinColumns, const vector& largeJoinColumns, - JoinType jt, threadpool::ThreadPool* jsThreadPool) + JoinType jt, threadpool::ThreadPool* jsThreadPool, const uint64_t numCores) : smallRG(smallInput) , largeRG(largeInput) , joinAlg(INSERTING) @@ -157,6 +158,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R , bSignedUnsignedJoin(false) , uniqueLimit(100) , finished(false) + , numCores(numCores) , jobstepThreadPool(jsThreadPool) , _convertToDiskJoin(false) { @@ -254,11 +256,6 @@ bool TupleJoiner::operator<(const TupleJoiner& tj) const void TupleJoiner::getBucketCount() { - // get the # of cores, round up to nearest power of 2 - // make the bucket mask - numCores = sysconf(_SC_NPROCESSORS_ONLN); - if (numCores <= 0) - numCores = 8; bucketCount = (numCores == 1 ? 1 : (1 << (32 - __builtin_clz(numCores - 1)))); bucketMask = bucketCount - 1; } diff --git a/utils/joiner/tuplejoiner.h b/utils/joiner/tuplejoiner.h index aa4aaf647..98af11b31 100644 --- a/utils/joiner/tuplejoiner.h +++ b/utils/joiner/tuplejoiner.h @@ -268,12 +268,12 @@ class TupleJoiner /* ctor to use for numeric join */ TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, uint32_t smallJoinColumn, uint32_t largeJoinColumn, joblist::JoinType jt, - threadpool::ThreadPool* jsThreadPool); + threadpool::ThreadPool* jsThreadPool, const uint64_t numCores); /* ctor to use for string & compound join */ TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, const std::vector& smallJoinColumns, const std::vector& largeJoinColumns, - joblist::JoinType jt, threadpool::ThreadPool* jsThreadPool); + joblist::JoinType jt, threadpool::ThreadPool* jsThreadPool, const uint64_t numCores); ~TupleJoiner();