From a0b3424603c653d32490a592c4b711edba2caea3 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Wed, 13 Mar 2019 10:19:43 +0300 Subject: [PATCH] MCOL-2244 Columnstore execution threads now have names describe the threads operation. This should simplify CPU bottlenecks troubleshooting. --- dbcon/joblist/crossenginestep.h | 2 + dbcon/joblist/diskjoinstep.h | 6 ++ dbcon/joblist/pdictionaryscan.cpp | 4 + dbcon/joblist/subquerystep.h | 2 + dbcon/joblist/tuple-bps.cpp | 92 ++----------------- dbcon/joblist/tupleaggregatestep.h | 5 + dbcon/joblist/tupleannexstep.cpp | 4 + dbcon/joblist/tupleconstantstep.h | 2 + dbcon/joblist/tuplehashjoin.h | 4 + dbcon/joblist/tuplehavingstep.h | 2 + dbcon/joblist/tupleunion.h | 2 + dbcon/joblist/windowfunctionstep.h | 2 + .../primproc/batchprimitiveprocessor.cpp | 3 +- primitives/primproc/bppseeder.cpp | 1 + primitives/primproc/primitiveserver.cpp | 12 +++ utils/common/CMakeLists.txt | 3 +- utils/common/threadnaming.cpp | 26 ++++++ utils/common/threadnaming.h | 24 +++++ 18 files changed, 109 insertions(+), 87 deletions(-) create mode 100644 utils/common/threadnaming.cpp create mode 100644 utils/common/threadnaming.h diff --git a/dbcon/joblist/crossenginestep.h b/dbcon/joblist/crossenginestep.h index 242d00cdf..4ebf2ac9d 100644 --- a/dbcon/joblist/crossenginestep.h +++ b/dbcon/joblist/crossenginestep.h @@ -28,6 +28,7 @@ #include "jobstep.h" #include "primitivestep.h" +#include "threadnaming.h" using namespace std; @@ -192,6 +193,7 @@ protected: Runner(CrossEngineStep* step) : fStep(step) { } void operator()() { + utils::setThreadName("CESRunner"); fStep->execute(); } diff --git a/dbcon/joblist/diskjoinstep.h b/dbcon/joblist/diskjoinstep.h index 154571cdc..fbdda890a 100644 --- a/dbcon/joblist/diskjoinstep.h +++ b/dbcon/joblist/diskjoinstep.h @@ -1,4 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2019 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -18,6 +19,7 @@ #include "jobstep.h" #include "tuplehashjoin.h" #include "joinpartition.h" +#include "threadnaming.h" #include "../../utils/threadpool/prioritythreadpool.h" #ifndef DISKJOINSTEP_H @@ -65,6 +67,7 @@ private: Runner(DiskJoinStep* d) : djs(d) { } void operator()() { + utils::setThreadName("DJSMainRunner"); djs->mainRunner(); } DiskJoinStep* djs; @@ -92,6 +95,7 @@ private: Loader(DiskJoinStep* d) : djs(d) { } void operator()() { + utils::setThreadName("DJSLoader"); djs->loadFcn(); } DiskJoinStep* djs; @@ -114,6 +118,7 @@ private: Builder(DiskJoinStep* d) : djs(d) { } void operator()() { + utils::setThreadName("DJSBuilder"); djs->buildFcn(); } DiskJoinStep* djs; @@ -126,6 +131,7 @@ private: Joiner(DiskJoinStep* d) : djs(d) { } void operator()() { + utils::setThreadName("DJSJoiner"); djs->joinFcn(); } DiskJoinStep* djs; diff --git a/dbcon/joblist/pdictionaryscan.cpp b/dbcon/joblist/pdictionaryscan.cpp index 47f469723..6c1b7cecd 100644 --- a/dbcon/joblist/pdictionaryscan.cpp +++ b/dbcon/joblist/pdictionaryscan.cpp @@ -63,6 +63,8 @@ using namespace rowgroup; #include "querytele.h" using namespace querytele; +#include "threadnaming.h" + namespace joblist { @@ -75,6 +77,7 @@ struct pDictionaryScanPrimitive { try { + utils::setThreadName("DSSScan"); fPDictScan->sendPrimitiveMessages(); } catch (runtime_error& re) @@ -99,6 +102,7 @@ struct pDictionaryScanAggregator { try { + utils::setThreadName("DSSAgg"); fPDictScan->receivePrimitiveMessages(); } catch (runtime_error& re) diff --git a/dbcon/joblist/subquerystep.h b/dbcon/joblist/subquerystep.h index 2ce6ec579..d69863df5 100644 --- a/dbcon/joblist/subquerystep.h +++ b/dbcon/joblist/subquerystep.h @@ -31,6 +31,7 @@ #include "jobstep.h" #include "joblist.h" #include "funcexpwrapper.h" +#include "threadnaming.h" namespace joblist { @@ -264,6 +265,7 @@ protected: Runner(SubAdapterStep* step) : fStep(step) { } void operator()() { + utils::setThreadName("SQSRunner"); fStep->execute(); } diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index b42533763..de46f85d0 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -70,6 +70,8 @@ using namespace BRM; #include "rowgroup.h" using namespace rowgroup; +#include "threadnaming.h" + #include "querytele.h" using namespace querytele; @@ -106,6 +108,7 @@ struct TupleBPSPrimitive { try { + utils::setThreadName("BPSPrimitive"); fBatchPrimitiveStep->sendPrimitiveMessages(); } catch (std::exception& re) @@ -135,6 +138,7 @@ struct TupleBPSAggregators { try { + utils::setThreadName("BPSAggregator"); fBatchPrimitiveStepCols->receiveMultiPrimitiveMessages(fThreadId); } catch (std::exception& re) @@ -276,7 +280,6 @@ TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) : hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter = hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false; -// cout << "TBPSCount = " << ++TBPSCount << endl; } TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : @@ -336,8 +339,6 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : fBPP->setTxnID(fTxnId); fTraceFlags = rhs.fTraceFlags; fBPP->setTraceFlags(fTraceFlags); -// if (fOid>=3000) -// cout << "BPS:initalized from pColScanStep. fSessionId=" << fSessionId << endl; fBPP->setStepID(fStepId); fBPP->setOutputType(ROW_GROUP); fPhysicalIO = 0; @@ -352,9 +353,6 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : hasUMJoin = false; fRunExecuted = false; smallOuterJoiner = -1; - // @1098 initialize scanFlags to be true - //scanFlags.assign(numExtents, true); - //runtimeCPFlags.assign(numExtents, true); bop = BOP_AND; runRan = joinRan = false; @@ -405,9 +403,6 @@ TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) : fTraceFlags = rhs.fTraceFlags; fBPP->setTraceFlags(fTraceFlags); fBPP->setOutputType(ROW_GROUP); -// if (fOid>=3000) -// cout << "BPS:initalized from PassThruStep. fSessionId=" << fSessionId << endl; - finishedSending = sendWaiting = false; fSwallowRows = false; fNumBlksSkipped = 0; @@ -437,7 +432,6 @@ TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) : hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter = hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false; -// cout << "TBPSCount = " << ++TBPSCount << endl; } TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) : @@ -463,7 +457,6 @@ TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) : fStepCount = 1; fCPEvaluated = false; fEstimatedRows = 0; - //fColType = rhs.colType(); alias(rhs.alias()); view(rhs.view()); name(rhs.name()); @@ -472,8 +465,6 @@ TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) : fBPP.reset(new BatchPrimitiveProcessorJL(fRm)); initializeConfigParms(); fBPP->setSessionID(fSessionId); -// if (fOid>=3000) -// cout << "BPS:initalized from DictionaryStep. fSessionId=" << fSessionId << endl; fBPP->setStepID(fStepId); fBPP->setQueryContext(fVerId); fBPP->setTxnID(fTxnId); @@ -506,7 +497,6 @@ TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) : hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter = hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false; -// cout << "TBPSCount = " << ++TBPSCount << endl; } TupleBPS::~TupleBPS() @@ -541,7 +531,6 @@ TupleBPS::~TupleBPS() fDec->removeQueue(uniqueID); } -// cout << "~TBPSCount = " << --TBPSCount << endl; } void TupleBPS::setBPP(JobStep* jobStep) @@ -558,7 +547,6 @@ void TupleBPS::setBPP(JobStep* jobStep) if (pseudo) { - //cout << "adding a pseudo col filter" << endl; fBPP->addFilterStep(*pseudo); if (pseudo->filterCount() > 0) @@ -690,8 +678,6 @@ void TupleBPS::setProjectBPP(JobStep* jobStep1, JobStep* jobStep2) colWidth = (pcsp->colType()).colWidth; projectOids.push_back(jobStep1->oid()); -// if (fOid>=3000) -// cout << "Adding project step pColStep and pDictionaryStep to BPS" << endl; } else { @@ -708,8 +694,6 @@ void TupleBPS::setProjectBPP(JobStep* jobStep1, JobStep* jobStep2) projectOids.push_back(jobStep1->oid()); colWidth = (psth->colType()).colWidth; -// if (fOid>=3000) -// cout << "Adding project step PassThruStep and pDictionaryStep to BPS" << endl; } } } @@ -723,7 +707,6 @@ void TupleBPS::setProjectBPP(JobStep* jobStep1, JobStep* jobStep2) if (pseudo) { - //cout << "adding a pseudo col projection" << endl; fBPP->addProjectStep(*pseudo); } else @@ -835,7 +818,6 @@ void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts) } } - //cout << "cp column number=" << cpColVec.size() << " 1st col extents size= " << scanFlags.size() << endl; if (cpColVec.size() == 0) return; @@ -900,12 +882,8 @@ void TupleBPS::startAggregationThread() fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, fNumThreads - 1))); } -//#include "boost/date_time/posix_time/posix_time.hpp" - void TupleBPS::serializeJoiner() { -// boost::posix_time::ptime start, stop; -// start = boost::posix_time::microsec_clock::local_time(); ByteStream bs; bool more = true; @@ -925,8 +903,6 @@ void TupleBPS::serializeJoiner() bs.restart(); } -// stop = boost::posix_time::microsec_clock::local_time(); -// cout << "serializing took " << stop-start << endl; } void TupleBPS::serializeJoiner(uint32_t conn) @@ -957,8 +933,6 @@ void TupleBPS::prepCasualPartitioning() { if (fOid >= 3000) { - //if (scanFlags[i] && !runtimeCPFlags[i]) - // cout << "runtime flags eliminated an extent!\n"; scanFlags[i] = scanFlags[i] && runtimeCPFlags[i]; if (scanFlags[i] && lbidList->CasualPartitionDataType(fColType.colDataType, @@ -1209,22 +1183,10 @@ void TupleBPS::run() if (fe2) { - //if (fDelivery) { - // fe2Data.reinit(fe2Output); - // fe2Output.setData(&fe2Data); - //} primRowGroup.initRow(&fe2InRow); fe2Output.initRow(&fe2OutRow); } - /* - if (doJoin) { - for (uint32_t z = 0; z < smallSideCount; z++) - cout << "join #" << z << " " << "0x" << hex << tjoiners[z]->getJoinType() - << std::dec << " typeless: " << (uint32_t) tjoiners[z]->isTypelessJoin() << endl; - } - */ - try { fDec->addDECEventListener(this); @@ -1330,7 +1292,6 @@ void TupleBPS::sendError(uint16_t status) } fBPP->reset(); -// msgsSent++; // not expecting a response from this msg finishedSending = true; condvar.notify_all(); condvarWakeupProducer.notify_all(); @@ -1441,7 +1402,6 @@ void TupleBPS::sendJobs(const vector& jobs) for (i = 0; i < jobs.size() && !cancelled(); i++) { - //cout << "sending a job for dbroot " << jobs[i].dbroot << ", PM " << jobs[i].connectionNum << endl; fDec->write(uniqueID, *(jobs[i].msg)); tplLock.lock(); msgsSent += jobs[i].expectedResponses; @@ -1785,20 +1745,17 @@ void TupleBPS::makeJobs(vector* jobs) if (!inBounds) { - //cout << "out of bounds" << endl; continue; } if (!scanFlags[i]) { - //cout << "CP elimination" << endl; fNumBlksSkipped += lbidsToScan; continue; } if (!processPseudoColFilters(i, dbRootPMMap)) { - //cout << "Skipping an extent due to pseudo-column filter elimination" << endl; fNumBlksSkipped += lbidsToScan; continue; } @@ -1840,8 +1797,6 @@ void TupleBPS::makeJobs(vector* jobs) } } -// cout << " session " << fSessionId << " idx = " << i << " HWM = " << scannedExtents[i].HWM -// << " ... will scan " << lbidsToScan << " lbids\n"; // the # of logical blocks in this extent if (lbidsToScan % fColType.colWidth) @@ -1857,22 +1812,17 @@ void TupleBPS::makeJobs(vector* jobs) #else blocksPerJob = max(blocksToScan / fProcessorThreadsPerScan, 16U); #endif - //cout << "blocks to scan = " << blocksToScan << " blocks per job = " << blocksPerJob << - // " HWM == " << scannedExtents[i].HWM << endl; startingLBID = scannedExtents[i].range.start; while (blocksToScan > 0) { uint32_t blocksThisJob = min(blocksToScan, blocksPerJob); - //cout << "starting LBID = " << startingLBID << " count = " << blocksThisJob << - // " dbroot = " << scannedExtents[i].dbRoot << endl; fBPP->setLBID(startingLBID, scannedExtents[i]); fBPP->setCount(blocksThisJob); bs.reset(new ByteStream()); fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot]); - //cout << "making job for connection # " << (*dbRootConnectionMap)[scannedExtents[i].dbRoot] << endl; jobs->push_back(Job(scannedExtents[i].dbRoot, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], blocksThisJob, bs)); blocksToScan -= blocksThisJob; @@ -1881,7 +1831,6 @@ void TupleBPS::makeJobs(vector* jobs) } } -// cout << "session " << fSessionId << " sees " << extentCounter << " extents" << endl; } void TupleBPS::sendPrimitiveMessages() @@ -1901,19 +1850,16 @@ void TupleBPS::sendPrimitiveMessages() } catch (const IDBExcept& e) { - //cout << "Caught IDBExcept" << e.what() << e.errorCode() << endl; sendError(e.errorCode()); processError(e.what(), e.errorCode(), "TupleBPS::sendPrimitiveMessages()"); } catch (const std::exception& ex) { - //cout << "Caught exception" << endl; sendError(ERR_TUPLE_BPS); processError(ex.what(), ERR_TUPLE_BPS, "TupleBPS::sendPrimitiveMessages()"); } catch (...) { - //cout << "Caught ..." << endl; sendError(ERR_TUPLE_BPS); processError("unknown", ERR_TUPLE_BPS, "TupleBPS::sendPrimitiveMessages()"); } @@ -2189,7 +2135,6 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) tplLock.unlock(); -// cout << "thread " << threadID << " has " << size << " Bytestreams\n"; for (i = 0; i < size && !cancelled(); i++) { ByteStream* bs = bsv[i].get(); @@ -2244,18 +2189,16 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) local_outputRG.resetRowGroup(local_primRG.getBaseRid()); local_outputRG.setDBRoot(local_primRG.getDBRoot()); local_primRG.getRow(0, &largeSideRow); - //cout << "large-side raw data: " << local_primRG.toString() << endl; - //cout << "jointype = " << tjoiners[0]->getJoinType() << endl; for (k = 0; k < local_primRG.getRowCount() && !cancelled(); k++, largeSideRow.nextRow()) { - //cout << "TBPS: Large side row: " << largeSideRow.toString() << endl; matchCount = 0; for (j = 0; j < smallSideCount; j++) { tjoiners[j]->match(largeSideRow, k, threadID, &joinerOutput[j]); - /* Debugging code to print the matches +#ifdef JLF_DEBUG + // Debugging code to print the matches Row r; joinerMatchesRGs[j].initRow(&r); cout << joinerOutput[j].size() << " matches: \n"; @@ -2263,7 +2206,7 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) r.setPointer(joinerOutput[j][z]); cout << " " << r.toString() << endl; } - */ +#endif matchCount = joinerOutput[j].size(); if (tjoiners[j]->inUM()) @@ -2271,7 +2214,6 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) /* Count the # of rows that pass the join filter */ if (tjoiners[j]->hasFEFilter() && matchCount > 0) { - //cout << "doing FE filter" << endl; vector newJoinerOutput; applyMapping(fergMappings[smallSideCount], largeSideRow, &joinFERow); @@ -2311,10 +2253,6 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) if (tjoiners[j]->antiJoin()) { matchCount = (matchCount ? 0 : 1); - // if (matchCount) - // cout << "in the result\n"; - // else - // cout << "not in the result\n"; } if (matchCount == 0) @@ -2380,7 +2318,6 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) } else { -// cout << "TBPS: sending unjoined data\n"; rgDatav.push_back(rgData); } @@ -2488,7 +2425,6 @@ out: { smallSideRows[i].setPointer(unmatched[j]); -// cout << "small side Row: " << smallSideRows[i].toString() << endl; for (k = 0; k < smallSideCount; k++) { if (i == k) @@ -2499,8 +2435,6 @@ out: applyMapping(largeMapping, largeNull, &joinedBaseRow); joinedBaseRow.setRid(0); -// cout << "outer row is " << joinedBaseRow.toString() << endl; -// joinedBaseRow.setRid(largeSideRow.getRelRid()); joinedBaseRow.nextRow(); local_outputRG.incRowCount(); @@ -2785,8 +2719,6 @@ inline bool TupleBPS::scanit(uint64_t rid) fbo = rid >> rpbShift; extentIndex = fbo >> divShift; - //if (scanFlags[extentIndex] && !runtimeCPFlags[extentIndex]) - // cout << "HJ feedback eliminated an extent!\n"; return scanFlags[extentIndex] && runtimeCPFlags[extentIndex]; } @@ -2908,7 +2840,6 @@ void TupleBPS::generateJoinResultSet(const vector >& joiner { smallRow.setPointer(joinerOutput[depth][i]); applyMapping(mappings[depth], smallRow, &baseRow); -// cout << "depth " << depth << ", size " << joinerOutput[depth].size() << ", row " << i << ": " << smallRow.toString() << endl; generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1, outputRG, rgData, outputData, smallRows, joinedRow); } @@ -2926,7 +2857,6 @@ void TupleBPS::generateJoinResultSet(const vector >& joiner { uint32_t dbRoot = outputRG.getDBRoot(); uint64_t baseRid = outputRG.getBaseRid(); -// cout << "GJRS adding data\n"; outputData->push_back(rgData); rgData = RGData(outputRG); outputRG.setData(&rgData); @@ -2935,11 +2865,8 @@ void TupleBPS::generateJoinResultSet(const vector >& joiner outputRG.getRow(0, &joinedRow); } -// cout << "depth " << depth << ", size " << joinerOutput[depth].size() << ", row " << i << ": " << smallRow.toString() << endl; applyMapping(mappings[depth], smallRow, &baseRow); copyRow(baseRow, &joinedRow); - //memcpy(joinedRow.getData(), baseRow.getData(), joinedRow.getSize()); - //cout << "(step " << fStepId << ") fully joined row is: " << joinedRow.toString() << endl; } } } @@ -3104,7 +3031,6 @@ void TupleBPS::processFE2_oneRG(RowGroup& input, RowGroup& output, Row& inRow, if (ret) { applyMapping(fe2Mapping, inRow, &outRow); - //cout << "fe2 passed row: " << outRow.toString() << endl; outRow.setRid(inRow.getRelRid()); output.incRowCount(); outRow.nextRow(); @@ -3153,7 +3079,6 @@ void TupleBPS::processFE2(RowGroup& input, RowGroup& output, Row& inRow, Row& ou output.getBaseRid() != input.getBaseRid() ) { -// cout << "FE2 produced a full RG\n"; results.push_back(result); result = RGData(output); output.setData(&result); @@ -3167,12 +3092,9 @@ void TupleBPS::processFE2(RowGroup& input, RowGroup& output, Row& inRow, Row& ou if (output.getRowCount() > 0) { -// cout << "FE2 produced " << output.getRowCount() << " rows\n"; results.push_back(result); } -// else -// cout << "no rows from FE2\n"; rgData->swap(results); } diff --git a/dbcon/joblist/tupleaggregatestep.h b/dbcon/joblist/tupleaggregatestep.h index 13a12a8c8..ae356e156 100644 --- a/dbcon/joblist/tupleaggregatestep.h +++ b/dbcon/joblist/tupleaggregatestep.h @@ -1,4 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2019 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -23,6 +24,7 @@ #include "jobstep.h" #include "rowaggregation.h" +#include "threadnaming.h" #include @@ -138,6 +140,7 @@ private: Aggregator(TupleAggregateStep* step) : fStep(step) { } void operator()() { + utils::setThreadName("TASAggr"); fStep->doAggregate(); } @@ -153,6 +156,7 @@ private: {} void operator()() { + utils::setThreadName("TASThrAggr"); fStep->threadedAggregateRowGroups(fThreadID); } @@ -171,6 +175,7 @@ private: } void operator()() { + utils::setThreadName("TASThr2ndPAggr"); for (uint32_t i = 0; i < bucketCount; i++) fStep->doThreadedSecondPhaseAggregate(fThreadID + i); } diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index 77a2670d9..c1ef06a4e 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -51,6 +51,7 @@ using namespace rowgroup; #include "hasher.h" #include "stlpoolallocator.h" +#include "threadnaming.h" using namespace utils; #include "querytele.h" @@ -314,6 +315,7 @@ void TupleAnnexStep::execute() void TupleAnnexStep::executeNoOrderBy() { + utils::setThreadName("TASwoOrd"); RGData rgDataIn; RGData rgDataOut; bool more = false; @@ -399,6 +401,7 @@ void TupleAnnexStep::executeNoOrderBy() void TupleAnnexStep::executeNoOrderByWithDistinct() { + utils::setThreadName("TASwoOrdDist"); scoped_ptr distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this))); vector dataVec; RGData rgDataIn; @@ -500,6 +503,7 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() void TupleAnnexStep::executeWithOrderBy() { + utils::setThreadName("TASwOrd"); RGData rgDataIn; RGData rgDataOut; bool more = false; diff --git a/dbcon/joblist/tupleconstantstep.h b/dbcon/joblist/tupleconstantstep.h index 6187f04a6..c98227805 100644 --- a/dbcon/joblist/tupleconstantstep.h +++ b/dbcon/joblist/tupleconstantstep.h @@ -22,6 +22,7 @@ #define JOBLIST_TUPLECONSTANTSTEP_H #include "jobstep.h" +#include "threadnaming.h" namespace joblist { @@ -98,6 +99,7 @@ protected: Runner(TupleConstantStep* step) : fStep(step) { } void operator()() { + utils::setThreadName("TCSRunner"); fStep->execute(); } diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index d89e76d2f..27154946f 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -1,4 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2019 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -25,6 +26,7 @@ #include "calpontsystemcatalog.h" #include "hasher.h" #include "tuplejoiner.h" +#include "threadnaming.h" #include #include #include @@ -450,6 +452,7 @@ private: HJRunner(TupleHashJoinStep* hj) : HJ(hj) { } void operator()() { + utils::setThreadName("HJSBigSide"); HJ->hjRunner(); } TupleHashJoinStep* HJ; @@ -459,6 +462,7 @@ private: SmallRunner(TupleHashJoinStep* hj, uint32_t i) : HJ(hj), index(i) { } void operator()() { + utils::setThreadName("HJSSmallSide"); HJ->smallRunnerFcn(index); } TupleHashJoinStep* HJ; diff --git a/dbcon/joblist/tuplehavingstep.h b/dbcon/joblist/tuplehavingstep.h index 2624a7e4a..a7fc679ab 100644 --- a/dbcon/joblist/tuplehavingstep.h +++ b/dbcon/joblist/tuplehavingstep.h @@ -23,6 +23,7 @@ #include "jobstep.h" #include "expressionstep.h" +#include "threadnaming.h" // forward reference namespace fucexp @@ -97,6 +98,7 @@ protected: Runner(TupleHavingStep* step) : fStep(step) { } void operator()() { + utils::setThreadName("HVSRunner"); fStep->execute(); } diff --git a/dbcon/joblist/tupleunion.h b/dbcon/joblist/tupleunion.h index 2c54337d3..b6a445da9 100644 --- a/dbcon/joblist/tupleunion.h +++ b/dbcon/joblist/tupleunion.h @@ -35,6 +35,7 @@ #endif #include "stlpoolallocator.h" +#include "threadnaming.h" #ifndef TUPLEUNION2_H_ #define TUPLEUNION2_H_ @@ -155,6 +156,7 @@ private: Runner(TupleUnion* t, uint32_t in) : tu(t), index(in) { } void operator()() { + utils::setThreadName("TUSRunner"); tu->readInput(index); } }; diff --git a/dbcon/joblist/windowfunctionstep.h b/dbcon/joblist/windowfunctionstep.h index 302629c4c..f483c6027 100644 --- a/dbcon/joblist/windowfunctionstep.h +++ b/dbcon/joblist/windowfunctionstep.h @@ -25,6 +25,7 @@ #include "jobstep.h" #include "rowgroup.h" #include "windowfunctioncolumn.h" +#include "threadnaming.h" namespace execplan { @@ -153,6 +154,7 @@ private: Runner(WindowFunctionStep* step) : fStep(step) { } void operator()() { + utils::setThreadName("WFSRunner"); fStep->execute(); } diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 019761d39..752c94d9e 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -52,6 +52,7 @@ using namespace boost; #include "fixedallocator.h" #include "blockcacheclient.h" #include "MonitorProcMem.h" +#include "threadnaming.h" #define MAX64 0x7fffffffffffffffLL #define MIN64 0x8000000000000000LL @@ -156,7 +157,6 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch, sendThread = bppst; pthread_mutex_init(&objLock, NULL); initBPP(b); -// cerr << "made a BPP\n"; } #if 0 @@ -1961,6 +1961,7 @@ void BatchPrimitiveProcessor::makeResponse() int BatchPrimitiveProcessor::operator()() { + utils::setThreadName("PPBatchPrimProc"); if (currentBlockOffset == 0) { #ifdef PRIMPROC_STOPWATCH // TODO: needs to be brought up-to-date diff --git a/primitives/primproc/bppseeder.cpp b/primitives/primproc/bppseeder.cpp index 1755135a3..4b94d0c5c 100644 --- a/primitives/primproc/bppseeder.cpp +++ b/primitives/primproc/bppseeder.cpp @@ -142,6 +142,7 @@ int BPPSeeder::operator()() pthread_t tid = 0; boost::mutex::scoped_lock scoped(bppLock, boost::defer_lock_t()); + try { if (firstRun) diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 227b494de..76ff8ef3c 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -91,6 +91,8 @@ using namespace idbdatafile; using namespace threadpool; +#include "threadnaming.h" + #include "atomicops.h" #ifndef O_BINARY @@ -925,6 +927,7 @@ struct AsynchLoader void operator()() { + utils::setThreadName("PPAsyncLoader"); bool cached = false; uint32_t rCount = 0; char buf[BLOCK_SIZE]; @@ -1159,6 +1162,7 @@ void DictScanJob::write(const ByteStream& bs) int DictScanJob::operator()() { + utils::setThreadName("PPDictScanJob"); uint8_t data[DATA_BLOCK_SIZE]; uint32_t output_buf_size = MAX_BUFFER_SIZE; uint32_t session; @@ -1338,6 +1342,7 @@ struct BPPHandler LastJoiner(boost::shared_ptr r, SBS b) : BPPHandlerFunctor(r, b) { } int operator()() { + utils::setThreadName("PPHandLastJoiner"); return rt->lastJoinerMsg(*bs, dieTime); } }; @@ -1347,6 +1352,7 @@ struct BPPHandler Create(boost::shared_ptr r, SBS b) : BPPHandlerFunctor(r, b) { } int operator()() { + utils::setThreadName("PPHandCreate"); rt->createBPP(*bs); return 0; } @@ -1357,6 +1363,7 @@ struct BPPHandler Destroy(boost::shared_ptr r, SBS b) : BPPHandlerFunctor(r, b) { } int operator()() { + utils::setThreadName("PPHandDestroy"); return rt->destroyBPP(*bs, dieTime); } }; @@ -1366,6 +1373,7 @@ struct BPPHandler AddJoiner(boost::shared_ptr r, SBS b) : BPPHandlerFunctor(r, b) { } int operator()() { + utils::setThreadName("PPHandAddJoiner"); return rt->addJoinerToBPP(*bs, dieTime); } }; @@ -1375,6 +1383,7 @@ struct BPPHandler Abort(boost::shared_ptr r, SBS b) : BPPHandlerFunctor(r, b) { } int operator()() { + utils::setThreadName("PPHandAbort"); return rt->doAbort(*bs, dieTime); } }; @@ -1751,6 +1760,7 @@ public: virtual int execute() = 0; int operator()() { + utils::setThreadName("PPDictOp"); int ret; ret = execute(); @@ -1967,6 +1977,7 @@ struct ReadThread void operator()() { + utils::setThreadName("PPReadThread"); boost::shared_ptr procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); SBS bs; @@ -2376,6 +2387,7 @@ struct ServerThread void operator()() { + utils::setThreadName("PPServerThr"); IOSocket ios; try diff --git a/utils/common/CMakeLists.txt b/utils/common/CMakeLists.txt index 54900167e..eff8be12f 100644 --- a/utils/common/CMakeLists.txt +++ b/utils/common/CMakeLists.txt @@ -8,7 +8,8 @@ set(common_LIB_SRCS poolallocator.cpp cgroupconfigurator.cpp MonitorProcMem.cpp - nullvaluemanip.cpp) + nullvaluemanip.cpp + threadnaming.cpp) add_library(common SHARED ${common_LIB_SRCS}) diff --git a/utils/common/threadnaming.cpp b/utils/common/threadnaming.cpp new file mode 100644 index 000000000..2f4dda91f --- /dev/null +++ b/utils/common/threadnaming.cpp @@ -0,0 +1,26 @@ +/* Copyright (C) 2019 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include + +namespace utils +{ + void setThreadName(const char *threadName) + { + prctl(PR_SET_NAME, threadName, 0, 0, 0); + } +} // end of namespace diff --git a/utils/common/threadnaming.h b/utils/common/threadnaming.h new file mode 100644 index 000000000..1682b7045 --- /dev/null +++ b/utils/common/threadnaming.h @@ -0,0 +1,24 @@ +/* Copyright (C) 2019 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ +#ifndef H_SETTHREADNAME +#define H_SETTHREADNAME + +namespace utils +{ + void setThreadName(const char *threadName); +} // end of namespace +#endif