1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-05 16:15:50 +03:00

feat(stats): CES reports table name and HJS reports total matched rows

This commit is contained in:
drrtuy
2025-04-04 19:47:25 +00:00
parent 432d0cf7f8
commit e4ca7425e2
4 changed files with 78 additions and 39 deletions

View File

@@ -802,7 +802,7 @@ void CrossEngineStep::formatMiniStats()
ostringstream oss; ostringstream oss;
oss << "CES " oss << "CES "
<< "UM " << "UM "
<< "- " << tableAlias() << " "
<< "- " << "- "
<< "- " << "- "
<< "- " << "- "

View File

@@ -421,25 +421,14 @@ void JobList::querySummary(bool extendedStats)
try try
{ {
// bug3438, print subquery info prior to outer query
for (vector<SJLP>::iterator i = subqueryJoblists.begin(); i != subqueryJoblists.end(); i++)
{
i->get()->querySummary(extendedStats);
fStats += i->get()->queryStats();
fExtendedInfo += i->get()->extendedInfo();
fMiniInfo += i->get()->miniInfo();
}
JobStepVector::const_iterator qIter = fQuery.begin();
JobStepVector::const_iterator qEnd = fQuery.end();
JobStep* js;
// //
//...Add up the I/O and msg counts for the query job steps //...Add up the I/O and msg counts for the query job steps
// //
while (qIter != qEnd) for (auto iter = fQuery.rbegin(); iter != fQuery.rend(); ++iter)
{ {
js = qIter->get(); auto* js = iter->get();
fStats.fPhyIO += js->phyIOCount(); fStats.fPhyIO += js->phyIOCount();
fStats.fCacheIO += js->cacheIOCount(); fStats.fCacheIO += js->cacheIOCount();
@@ -488,19 +477,14 @@ void JobList::querySummary(bool extendedStats)
fExtendedInfo += ei; fExtendedInfo += ei;
fMiniInfo += js->miniInfo() + "\n"; fMiniInfo += js->miniInfo() + "\n";
} }
++qIter;
} }
JobStepVector::const_iterator pIter = fProject.begin();
JobStepVector::const_iterator pEnd = fProject.end();
// //
//...Add up the I/O and msg counts for the projection job steps //...Add up the I/O and msg counts for the projection job steps
// //
while (pIter != pEnd) for (auto iter = fProject.rbegin(); iter != fProject.rend(); ++iter)
{ {
js = pIter->get(); auto* js = iter->get();
fStats.fPhyIO += js->phyIOCount(); fStats.fPhyIO += js->phyIOCount();
fStats.fCacheIO += js->cacheIOCount(); fStats.fCacheIO += js->cacheIOCount();
@@ -527,7 +511,6 @@ void JobList::querySummary(bool extendedStats)
skipCnt = (dynamic_cast<BatchPrimitive*>(js))->blksSkipped(); skipCnt = (dynamic_cast<BatchPrimitive*>(js))->blksSkipped();
fStats.fCPBlocksSkipped += skipCnt; fStats.fCPBlocksSkipped += skipCnt;
++pIter;
} }
if ((!fProject.empty()) && extendedStats) if ((!fProject.empty()) && extendedStats)
@@ -536,7 +519,7 @@ void JobList::querySummary(bool extendedStats)
while (dsi != fDeliveredTables.end()) while (dsi != fDeliveredTables.end())
{ {
js = dynamic_cast<JobStep*>(dsi->second.get()); auto js = dynamic_cast<JobStep*>(dsi->second.get());
string ei; string ei;
int max = 0; int max = 0;
ei = js->extendedInfo(); ei = js->extendedInfo();
@@ -553,6 +536,14 @@ void JobList::querySummary(bool extendedStats)
++dsi; ++dsi;
} }
} }
// bug3438, print subquery info prior to outer query
for (auto i = subqueryJoblists.rbegin(); i != subqueryJoblists.rend(); i++)
{
i->get()->querySummary(extendedStats);
fStats += i->get()->queryStats();
fExtendedInfo += i->get()->extendedInfo();
fMiniInfo += i->get()->miniInfo();
}
} }
catch (exception& ex) catch (exception& ex)
{ {

View File

@@ -229,7 +229,8 @@ void TupleHashJoinStep::startSmallRunners(uint index)
else else
{ {
joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0],
largeSideKeys[index][0], jt, &jobstepThreadPool, resourceManager, numCores)); largeSideKeys[index][0], jt, &jobstepThreadPool, resourceManager,
numCores));
} }
joiners[index]->setUniqueLimit(uniqueLimit); joiners[index]->setUniqueLimit(uniqueLimit);
@@ -314,18 +315,18 @@ void TupleHashJoinStep::startSmallRunners(uint index)
{ {
{ {
oss << "PM join (" << index << ")" << endl; oss << "PM join (" << index << ")" << endl;
#ifdef JLF_DEBUG #ifdef JLF_DEBUG
cout << oss.str(); cout << oss.str();
#endif #endif
extendedInfo += oss.str(); extendedInfo += oss.str();
} }
} }
else if (joiners[index]->inUM()) else if (joiners[index]->inUM())
{ {
oss << "UM join (" << index << ")" << endl; oss << "UM join (" << index << ")" << endl;
#ifdef JLF_DEBUG #ifdef JLF_DEBUG
cout << oss.str(); cout << oss.str();
#endif #endif
extendedInfo += oss.str(); extendedInfo += oss.str();
} }
} }
@@ -337,7 +338,6 @@ void TupleHashJoinStep::startSmallRunners(uint index)
{ {
boost::mutex::scoped_lock lk(*fStatsMutexPtr); boost::mutex::scoped_lock lk(*fStatsMutexPtr);
fExtendedInfo += extendedInfo; fExtendedInfo += extendedInfo;
formatMiniStats(index);
} }
} }
@@ -950,9 +950,22 @@ void TupleHashJoinStep::hjRunner()
// this clause starts the THJS join threads. // this clause starts the THJS join threads.
startJoinThreads(); startJoinThreads();
if (traceOn())
{
boost::mutex::scoped_lock lk(*fStatsMutexPtr);
formatMiniStats();
for (uint32_t i = 0; i < joiners.size(); ++i)
{
formatMiniStatsPerJoiner(i);
}
}
if (fTableOID1 >= 3000) if (fTableOID1 >= 3000)
{ {
StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_SUMMARY, 1, QueryTeleClient::timeNowms(), 1, 0); StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_SUMMARY, 1, QueryTeleClient::timeNowms(), 1,
0);
postStepSummaryTele(sts); postStepSummaryTele(sts);
} }
} }
@@ -1226,7 +1239,7 @@ bool TupleHashJoinStep::deliverStringTableRowGroup() const
} }
// Must hold the stats lock when calling this! // Must hold the stats lock when calling this!
void TupleHashJoinStep::formatMiniStats(uint32_t index) void TupleHashJoinStep::formatMiniStatsPerJoiner(uint32_t index)
{ {
ostringstream oss; ostringstream oss;
oss << "HJS "; oss << "HJS ";
@@ -1251,6 +1264,28 @@ void TupleHashJoinStep::formatMiniStats(uint32_t index)
fMiniInfo += oss.str(); fMiniInfo += oss.str();
} }
void TupleHashJoinStep::formatMiniStats()
{
ostringstream oss;
oss << "HJS ";
oss << "UM ";
oss << alias() << " ";
if (fTableOID2 >= 3000)
oss << fTableOID2;
else
oss << "- ";
auto joinedRows =
std::accumulate(joinerRunnerInputMatchedStats.begin(), joinerRunnerInputMatchedStats.end(), 0ULL);
oss << " " << "- " << "- " << "- "
<< "- "
<< "-------- " << joinedRows << "\n";
fMiniInfo += oss.str();
}
void TupleHashJoinStep::addJoinFilter(boost::shared_ptr<execplan::ParseTree> pt, uint32_t index) void TupleHashJoinStep::addJoinFilter(boost::shared_ptr<execplan::ParseTree> pt, uint32_t index)
{ {
boost::shared_ptr<funcexp::FuncExpWrapper> newfe(new funcexp::FuncExpWrapper()); boost::shared_ptr<funcexp::FuncExpWrapper> newfe(new funcexp::FuncExpWrapper());
@@ -1352,6 +1387,9 @@ void TupleHashJoinStep::startJoinThreads()
/* Start join runners */ /* Start join runners */
joinRunners.reserve(joinThreadCount); joinRunners.reserve(joinThreadCount);
// Statistics collection
joinerRunnerInputRecordsStats.resize(joinThreadCount, 0);
joinerRunnerInputMatchedStats.resize(joinThreadCount, 0);
for (i = 0; i < joinThreadCount; i++) for (i = 0; i < joinThreadCount; i++)
joinRunners.push_back(jobstepThreadPool.invoke(JoinRunner(this, i))); joinRunners.push_back(jobstepThreadPool.invoke(JoinRunner(this, i)));
@@ -1362,6 +1400,7 @@ void TupleHashJoinStep::startJoinThreads()
if (lastSmallOuterJoiner != (uint32_t)-1) if (lastSmallOuterJoiner != (uint32_t)-1)
finishSmallOuterJoin(); finishSmallOuterJoin();
outputDL->endOfInput(); outputDL->endOfInput();
} }
@@ -1518,6 +1557,7 @@ void TupleHashJoinStep::joinRunnerFcn(uint32_t threadID)
smallRGs[i].initRow(&smallRowTemplates[i]); smallRGs[i].initRow(&smallRowTemplates[i]);
grabSomeWork(&inputData); grabSomeWork(&inputData);
std::cout << "joinRunnerFcn " << threadID << " has " << inputData.size() << " RGDatas" << std::endl;
while (!inputData.empty() && !cancelled()) while (!inputData.empty() && !cancelled())
{ {
@@ -1528,6 +1568,8 @@ void TupleHashJoinStep::joinRunnerFcn(uint32_t threadID)
if (local_inputRG.getRowCount() == 0) if (local_inputRG.getRowCount() == 0)
continue; continue;
joinerRunnerInputRecordsStats[threadID] += local_inputRG.getRowCount();
joinOneRG(threadID, joinedRowData, local_inputRG, local_outputRG, largeRow, joinFERow, joinedRow, joinOneRG(threadID, joinedRowData, local_inputRG, local_outputRG, largeRow, joinFERow, joinedRow,
baseRow, joinMatches, smallRowTemplates, outputDL); baseRow, joinMatches, smallRowTemplates, outputDL);
} }
@@ -1614,7 +1656,7 @@ void TupleHashJoinStep::processFE2(RowGroup& input, RowGroup& output, Row& inRow
output.incRowCount(); output.incRowCount();
outRow.nextRow(); outRow.nextRow();
if (output.getRowCount() == 8192) if (output.getRowCount() == rowgroup::rgCommonSize)
{ {
results.push_back(result); results.push_back(result);
result.reinit(output); result.reinit(output);
@@ -1788,7 +1830,7 @@ void TupleHashJoinStep::joinOneRG(
/* TODO!!! See TupleBPS for the fix for bug 3510! */ /* TODO!!! See TupleBPS for the fix for bug 3510! */
applyMapping((*rgMappings)[smallSideCount], largeSideRow, &baseRow); applyMapping((*rgMappings)[smallSideCount], largeSideRow, &baseRow);
baseRow.setRid(largeSideRow.getRelRid()); baseRow.setRid(largeSideRow.getRelRid());
generateJoinResultSet(joinMatches, baseRow, *rgMappings, 0, joinOutput, joinedData, out, generateJoinResultSet(threadID, joinMatches, baseRow, *rgMappings, 0, joinOutput, joinedData, out,
smallRowTemplates, joinedRow, outputDL); smallRowTemplates, joinedRow, outputDL);
} }
} }
@@ -1797,7 +1839,8 @@ void TupleHashJoinStep::joinOneRG(
out.push_back(joinedData); out.push_back(joinedData);
} }
void TupleHashJoinStep::generateJoinResultSet(const vector<vector<Row::Pointer> >& joinerOutput, Row& baseRow, void TupleHashJoinStep::generateJoinResultSet(const uint32_t threadID,
const vector<vector<Row::Pointer> >& joinerOutput, Row& baseRow,
const std::shared_ptr<std::shared_ptr<int[]>[]>& mappings, const std::shared_ptr<std::shared_ptr<int[]>[]>& mappings,
const uint32_t depth, RowGroup& l_outputRG, RGData& rgData, const uint32_t depth, RowGroup& l_outputRG, RGData& rgData,
vector<RGData>& outputData, vector<RGData>& outputData,
@@ -1814,8 +1857,8 @@ void TupleHashJoinStep::generateJoinResultSet(const vector<vector<Row::Pointer>
{ {
smallRow.setPointer(joinerOutput[depth][i]); smallRow.setPointer(joinerOutput[depth][i]);
applyMapping(mappings[depth], smallRow, &baseRow); applyMapping(mappings[depth], smallRow, &baseRow);
generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1, l_outputRG, rgData, outputData, generateJoinResultSet(threadID, joinerOutput, baseRow, mappings, depth + 1, l_outputRG, rgData,
smallRows, joinedRow, dlp); outputData, smallRows, joinedRow, dlp);
} }
} }
else else
@@ -1834,6 +1877,7 @@ void TupleHashJoinStep::generateJoinResultSet(const vector<vector<Row::Pointer>
uint32_t dbRoot = l_outputRG.getDBRoot(); uint32_t dbRoot = l_outputRG.getDBRoot();
uint64_t baseRid = l_outputRG.getBaseRid(); uint64_t baseRid = l_outputRG.getBaseRid();
outputData.push_back(rgData); outputData.push_back(rgData);
joinerRunnerInputMatchedStats[threadID] += rowgroup::rgCommonSize;
// Count the memory // Count the memory
if (UNLIKELY(outputData.size() > flushThreshold || !getMemory(l_outputRG.getSizeWithStrings()))) if (UNLIKELY(outputData.size() > flushThreshold || !getMemory(l_outputRG.getSizeWithStrings())))
{ {
@@ -1868,6 +1912,7 @@ void TupleHashJoinStep::generateJoinResultSet(const vector<vector<Row::Pointer>
applyMapping(mappings[depth], smallRow, &baseRow); applyMapping(mappings[depth], smallRow, &baseRow);
copyRow(baseRow, &joinedRow); copyRow(baseRow, &joinedRow);
} }
joinerRunnerInputMatchedStats[threadID] += l_outputRG.getRowCount();
} }
} }

View File

@@ -392,7 +392,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
void errorLogging(const std::string& msg, int err) const; void errorLogging(const std::string& msg, int err) const;
void startAdjoiningSteps(); void startAdjoiningSteps();
void formatMiniStats(uint32_t index); void formatMiniStatsPerJoiner(uint32_t index);
void formatMiniStats();
RowGroupDL *largeDL, *outputDL; RowGroupDL *largeDL, *outputDL;
std::vector<RowGroupDL*> smallDLs; std::vector<RowGroupDL*> smallDLs;
@@ -538,7 +539,7 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
}; };
void joinRunnerFcn(uint32_t index); void joinRunnerFcn(uint32_t index);
void startJoinThreads(); void startJoinThreads();
void generateJoinResultSet(const std::vector<std::vector<rowgroup::Row::Pointer>>& joinerOutput, void generateJoinResultSet(const uint32_t threadID, const std::vector<std::vector<rowgroup::Row::Pointer>>& joinerOutput,
rowgroup::Row& baseRow, rowgroup::Row& baseRow,
const std::shared_ptr<std::shared_ptr<int[]>[]>& mappings, const uint32_t depth, const std::shared_ptr<std::shared_ptr<int[]>[]>& mappings, const uint32_t depth,
rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData,
@@ -639,6 +640,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
void segregateJoiners(); void segregateJoiners();
std::vector<std::shared_ptr<joiner::TupleJoiner>> tbpsJoiners; std::vector<std::shared_ptr<joiner::TupleJoiner>> tbpsJoiners;
std::vector<std::shared_ptr<joiner::TupleJoiner>> djsJoiners; std::vector<std::shared_ptr<joiner::TupleJoiner>> djsJoiners;
std::vector<size_t> joinerRunnerInputRecordsStats;
std::vector<size_t> joinerRunnerInputMatchedStats;
std::vector<int> djsJoinerMap; std::vector<int> djsJoinerMap;
boost::scoped_array<ssize_t> memUsedByEachJoin; boost::scoped_array<ssize_t> memUsedByEachJoin;
boost::mutex djsLock; boost::mutex djsLock;