1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

MCOL-2244 Columnstore execution threads now have names describe

the threads operation. This should simplify CPU bottlenecks troubleshooting.
This commit is contained in:
Roman Nozdrin
2019-03-13 10:19:43 +03:00
parent 2509d833fc
commit a0b3424603
18 changed files with 109 additions and 87 deletions

View File

@ -70,6 +70,8 @@ using namespace BRM;
#include "rowgroup.h"
using namespace rowgroup;
#include "threadnaming.h"
#include "querytele.h"
using namespace querytele;
@ -106,6 +108,7 @@ struct TupleBPSPrimitive
{
try
{
utils::setThreadName("BPSPrimitive");
fBatchPrimitiveStep->sendPrimitiveMessages();
}
catch (std::exception& re)
@ -135,6 +138,7 @@ struct TupleBPSAggregators
{
try
{
utils::setThreadName("BPSAggregator");
fBatchPrimitiveStepCols->receiveMultiPrimitiveMessages(fThreadId);
}
catch (std::exception& re)
@ -276,7 +280,6 @@ TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) :
hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
// cout << "TBPSCount = " << ++TBPSCount << endl;
}
TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) :
@ -336,8 +339,6 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) :
fBPP->setTxnID(fTxnId);
fTraceFlags = rhs.fTraceFlags;
fBPP->setTraceFlags(fTraceFlags);
// if (fOid>=3000)
// cout << "BPS:initalized from pColScanStep. fSessionId=" << fSessionId << endl;
fBPP->setStepID(fStepId);
fBPP->setOutputType(ROW_GROUP);
fPhysicalIO = 0;
@ -352,9 +353,6 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) :
hasUMJoin = false;
fRunExecuted = false;
smallOuterJoiner = -1;
// @1098 initialize scanFlags to be true
//scanFlags.assign(numExtents, true);
//runtimeCPFlags.assign(numExtents, true);
bop = BOP_AND;
runRan = joinRan = false;
@ -405,9 +403,6 @@ TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) :
fTraceFlags = rhs.fTraceFlags;
fBPP->setTraceFlags(fTraceFlags);
fBPP->setOutputType(ROW_GROUP);
// if (fOid>=3000)
// cout << "BPS:initalized from PassThruStep. fSessionId=" << fSessionId << endl;
finishedSending = sendWaiting = false;
fSwallowRows = false;
fNumBlksSkipped = 0;
@ -437,7 +432,6 @@ TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) :
hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
// cout << "TBPSCount = " << ++TBPSCount << endl;
}
TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) :
@ -463,7 +457,6 @@ TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) :
fStepCount = 1;
fCPEvaluated = false;
fEstimatedRows = 0;
//fColType = rhs.colType();
alias(rhs.alias());
view(rhs.view());
name(rhs.name());
@ -472,8 +465,6 @@ TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) :
fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
initializeConfigParms();
fBPP->setSessionID(fSessionId);
// if (fOid>=3000)
// cout << "BPS:initalized from DictionaryStep. fSessionId=" << fSessionId << endl;
fBPP->setStepID(fStepId);
fBPP->setQueryContext(fVerId);
fBPP->setTxnID(fTxnId);
@ -506,7 +497,6 @@ TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) :
hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
// cout << "TBPSCount = " << ++TBPSCount << endl;
}
TupleBPS::~TupleBPS()
@ -541,7 +531,6 @@ TupleBPS::~TupleBPS()
fDec->removeQueue(uniqueID);
}
// cout << "~TBPSCount = " << --TBPSCount << endl;
}
void TupleBPS::setBPP(JobStep* jobStep)
@ -558,7 +547,6 @@ void TupleBPS::setBPP(JobStep* jobStep)
if (pseudo)
{
//cout << "adding a pseudo col filter" << endl;
fBPP->addFilterStep(*pseudo);
if (pseudo->filterCount() > 0)
@ -690,8 +678,6 @@ void TupleBPS::setProjectBPP(JobStep* jobStep1, JobStep* jobStep2)
colWidth = (pcsp->colType()).colWidth;
projectOids.push_back(jobStep1->oid());
// if (fOid>=3000)
// cout << "Adding project step pColStep and pDictionaryStep to BPS" << endl;
}
else
{
@ -708,8 +694,6 @@ void TupleBPS::setProjectBPP(JobStep* jobStep1, JobStep* jobStep2)
projectOids.push_back(jobStep1->oid());
colWidth = (psth->colType()).colWidth;
// if (fOid>=3000)
// cout << "Adding project step PassThruStep and pDictionaryStep to BPS" << endl;
}
}
}
@ -723,7 +707,6 @@ void TupleBPS::setProjectBPP(JobStep* jobStep1, JobStep* jobStep2)
if (pseudo)
{
//cout << "adding a pseudo col projection" << endl;
fBPP->addProjectStep(*pseudo);
}
else
@ -835,7 +818,6 @@ void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts)
}
}
//cout << "cp column number=" << cpColVec.size() << " 1st col extents size= " << scanFlags.size() << endl;
if (cpColVec.size() == 0)
return;
@ -900,12 +882,8 @@ void TupleBPS::startAggregationThread()
fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, fNumThreads - 1)));
}
//#include "boost/date_time/posix_time/posix_time.hpp"
void TupleBPS::serializeJoiner()
{
// boost::posix_time::ptime start, stop;
// start = boost::posix_time::microsec_clock::local_time();
ByteStream bs;
bool more = true;
@ -925,8 +903,6 @@ void TupleBPS::serializeJoiner()
bs.restart();
}
// stop = boost::posix_time::microsec_clock::local_time();
// cout << "serializing took " << stop-start << endl;
}
void TupleBPS::serializeJoiner(uint32_t conn)
@ -957,8 +933,6 @@ void TupleBPS::prepCasualPartitioning()
{
if (fOid >= 3000)
{
//if (scanFlags[i] && !runtimeCPFlags[i])
// cout << "runtime flags eliminated an extent!\n";
scanFlags[i] = scanFlags[i] && runtimeCPFlags[i];
if (scanFlags[i] && lbidList->CasualPartitionDataType(fColType.colDataType,
@ -1209,22 +1183,10 @@ void TupleBPS::run()
if (fe2)
{
//if (fDelivery) {
// fe2Data.reinit(fe2Output);
// fe2Output.setData(&fe2Data);
//}
primRowGroup.initRow(&fe2InRow);
fe2Output.initRow(&fe2OutRow);
}
/*
if (doJoin) {
for (uint32_t z = 0; z < smallSideCount; z++)
cout << "join #" << z << " " << "0x" << hex << tjoiners[z]->getJoinType()
<< std::dec << " typeless: " << (uint32_t) tjoiners[z]->isTypelessJoin() << endl;
}
*/
try
{
fDec->addDECEventListener(this);
@ -1330,7 +1292,6 @@ void TupleBPS::sendError(uint16_t status)
}
fBPP->reset();
// msgsSent++; // not expecting a response from this msg
finishedSending = true;
condvar.notify_all();
condvarWakeupProducer.notify_all();
@ -1441,7 +1402,6 @@ void TupleBPS::sendJobs(const vector<Job>& jobs)
for (i = 0; i < jobs.size() && !cancelled(); i++)
{
//cout << "sending a job for dbroot " << jobs[i].dbroot << ", PM " << jobs[i].connectionNum << endl;
fDec->write(uniqueID, *(jobs[i].msg));
tplLock.lock();
msgsSent += jobs[i].expectedResponses;
@ -1785,20 +1745,17 @@ void TupleBPS::makeJobs(vector<Job>* jobs)
if (!inBounds)
{
//cout << "out of bounds" << endl;
continue;
}
if (!scanFlags[i])
{
//cout << "CP elimination" << endl;
fNumBlksSkipped += lbidsToScan;
continue;
}
if (!processPseudoColFilters(i, dbRootPMMap))
{
//cout << "Skipping an extent due to pseudo-column filter elimination" << endl;
fNumBlksSkipped += lbidsToScan;
continue;
}
@ -1840,8 +1797,6 @@ void TupleBPS::makeJobs(vector<Job>* jobs)
}
}
// cout << " session " << fSessionId << " idx = " << i << " HWM = " << scannedExtents[i].HWM
// << " ... will scan " << lbidsToScan << " lbids\n";
// the # of logical blocks in this extent
if (lbidsToScan % fColType.colWidth)
@ -1857,22 +1812,17 @@ void TupleBPS::makeJobs(vector<Job>* jobs)
#else
blocksPerJob = max(blocksToScan / fProcessorThreadsPerScan, 16U);
#endif
//cout << "blocks to scan = " << blocksToScan << " blocks per job = " << blocksPerJob <<
// " HWM == " << scannedExtents[i].HWM << endl;
startingLBID = scannedExtents[i].range.start;
while (blocksToScan > 0)
{
uint32_t blocksThisJob = min(blocksToScan, blocksPerJob);
//cout << "starting LBID = " << startingLBID << " count = " << blocksThisJob <<
// " dbroot = " << scannedExtents[i].dbRoot << endl;
fBPP->setLBID(startingLBID, scannedExtents[i]);
fBPP->setCount(blocksThisJob);
bs.reset(new ByteStream());
fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot]);
//cout << "making job for connection # " << (*dbRootConnectionMap)[scannedExtents[i].dbRoot] << endl;
jobs->push_back(Job(scannedExtents[i].dbRoot, (*dbRootConnectionMap)[scannedExtents[i].dbRoot],
blocksThisJob, bs));
blocksToScan -= blocksThisJob;
@ -1881,7 +1831,6 @@ void TupleBPS::makeJobs(vector<Job>* jobs)
}
}
// cout << "session " << fSessionId << " sees " << extentCounter << " extents" << endl;
}
void TupleBPS::sendPrimitiveMessages()
@ -1901,19 +1850,16 @@ void TupleBPS::sendPrimitiveMessages()
}
catch (const IDBExcept& e)
{
//cout << "Caught IDBExcept" << e.what() << e.errorCode() << endl;
sendError(e.errorCode());
processError(e.what(), e.errorCode(), "TupleBPS::sendPrimitiveMessages()");
}
catch (const std::exception& ex)
{
//cout << "Caught exception" << endl;
sendError(ERR_TUPLE_BPS);
processError(ex.what(), ERR_TUPLE_BPS, "TupleBPS::sendPrimitiveMessages()");
}
catch (...)
{
//cout << "Caught ..." << endl;
sendError(ERR_TUPLE_BPS);
processError("unknown", ERR_TUPLE_BPS, "TupleBPS::sendPrimitiveMessages()");
}
@ -2189,7 +2135,6 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
tplLock.unlock();
// cout << "thread " << threadID << " has " << size << " Bytestreams\n";
for (i = 0; i < size && !cancelled(); i++)
{
ByteStream* bs = bsv[i].get();
@ -2244,18 +2189,16 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
local_outputRG.resetRowGroup(local_primRG.getBaseRid());
local_outputRG.setDBRoot(local_primRG.getDBRoot());
local_primRG.getRow(0, &largeSideRow);
//cout << "large-side raw data: " << local_primRG.toString() << endl;
//cout << "jointype = " << tjoiners[0]->getJoinType() << endl;
for (k = 0; k < local_primRG.getRowCount() && !cancelled(); k++, largeSideRow.nextRow())
{
//cout << "TBPS: Large side row: " << largeSideRow.toString() << endl;
matchCount = 0;
for (j = 0; j < smallSideCount; j++)
{
tjoiners[j]->match(largeSideRow, k, threadID, &joinerOutput[j]);
/* Debugging code to print the matches
#ifdef JLF_DEBUG
// Debugging code to print the matches
Row r;
joinerMatchesRGs[j].initRow(&r);
cout << joinerOutput[j].size() << " matches: \n";
@ -2263,7 +2206,7 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
r.setPointer(joinerOutput[j][z]);
cout << " " << r.toString() << endl;
}
*/
#endif
matchCount = joinerOutput[j].size();
if (tjoiners[j]->inUM())
@ -2271,7 +2214,6 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
/* Count the # of rows that pass the join filter */
if (tjoiners[j]->hasFEFilter() && matchCount > 0)
{
//cout << "doing FE filter" << endl;
vector<Row::Pointer> newJoinerOutput;
applyMapping(fergMappings[smallSideCount], largeSideRow, &joinFERow);
@ -2311,10 +2253,6 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
if (tjoiners[j]->antiJoin())
{
matchCount = (matchCount ? 0 : 1);
// if (matchCount)
// cout << "in the result\n";
// else
// cout << "not in the result\n";
}
if (matchCount == 0)
@ -2380,7 +2318,6 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
}
else
{
// cout << "TBPS: sending unjoined data\n";
rgDatav.push_back(rgData);
}
@ -2488,7 +2425,6 @@ out:
{
smallSideRows[i].setPointer(unmatched[j]);
// cout << "small side Row: " << smallSideRows[i].toString() << endl;
for (k = 0; k < smallSideCount; k++)
{
if (i == k)
@ -2499,8 +2435,6 @@ out:
applyMapping(largeMapping, largeNull, &joinedBaseRow);
joinedBaseRow.setRid(0);
// cout << "outer row is " << joinedBaseRow.toString() << endl;
// joinedBaseRow.setRid(largeSideRow.getRelRid());
joinedBaseRow.nextRow();
local_outputRG.incRowCount();
@ -2785,8 +2719,6 @@ inline bool TupleBPS::scanit(uint64_t rid)
fbo = rid >> rpbShift;
extentIndex = fbo >> divShift;
//if (scanFlags[extentIndex] && !runtimeCPFlags[extentIndex])
// cout << "HJ feedback eliminated an extent!\n";
return scanFlags[extentIndex] && runtimeCPFlags[extentIndex];
}
@ -2908,7 +2840,6 @@ void TupleBPS::generateJoinResultSet(const vector<vector<Row::Pointer> >& joiner
{
smallRow.setPointer(joinerOutput[depth][i]);
applyMapping(mappings[depth], smallRow, &baseRow);
// cout << "depth " << depth << ", size " << joinerOutput[depth].size() << ", row " << i << ": " << smallRow.toString() << endl;
generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1,
outputRG, rgData, outputData, smallRows, joinedRow);
}
@ -2926,7 +2857,6 @@ void TupleBPS::generateJoinResultSet(const vector<vector<Row::Pointer> >& joiner
{
uint32_t dbRoot = outputRG.getDBRoot();
uint64_t baseRid = outputRG.getBaseRid();
// cout << "GJRS adding data\n";
outputData->push_back(rgData);
rgData = RGData(outputRG);
outputRG.setData(&rgData);
@ -2935,11 +2865,8 @@ void TupleBPS::generateJoinResultSet(const vector<vector<Row::Pointer> >& joiner
outputRG.getRow(0, &joinedRow);
}
// cout << "depth " << depth << ", size " << joinerOutput[depth].size() << ", row " << i << ": " << smallRow.toString() << endl;
applyMapping(mappings[depth], smallRow, &baseRow);
copyRow(baseRow, &joinedRow);
//memcpy(joinedRow.getData(), baseRow.getData(), joinedRow.getSize());
//cout << "(step " << fStepId << ") fully joined row is: " << joinedRow.toString() << endl;
}
}
}
@ -3104,7 +3031,6 @@ void TupleBPS::processFE2_oneRG(RowGroup& input, RowGroup& output, Row& inRow,
if (ret)
{
applyMapping(fe2Mapping, inRow, &outRow);
//cout << "fe2 passed row: " << outRow.toString() << endl;
outRow.setRid(inRow.getRelRid());
output.incRowCount();
outRow.nextRow();
@ -3153,7 +3079,6 @@ void TupleBPS::processFE2(RowGroup& input, RowGroup& output, Row& inRow, Row& ou
output.getBaseRid() != input.getBaseRid()
)
{
// cout << "FE2 produced a full RG\n";
results.push_back(result);
result = RGData(output);
output.setData(&result);
@ -3167,12 +3092,9 @@ void TupleBPS::processFE2(RowGroup& input, RowGroup& output, Row& inRow, Row& ou
if (output.getRowCount() > 0)
{
// cout << "FE2 produced " << output.getRowCount() << " rows\n";
results.push_back(result);
}
// else
// cout << "no rows from FE2\n";
rgData->swap(results);
}