diff --git a/dbcon/joblist/crossenginestep.cpp b/dbcon/joblist/crossenginestep.cpp index 9f63e012e..830f1c36a 100644 --- a/dbcon/joblist/crossenginestep.cpp +++ b/dbcon/joblist/crossenginestep.cpp @@ -802,7 +802,7 @@ void CrossEngineStep::formatMiniStats() ostringstream oss; oss << "CES " << "UM " - << "- " + << tableAlias() << " " << "- " << "- " << "- " diff --git a/dbcon/joblist/joblist.cpp b/dbcon/joblist/joblist.cpp index a96c8d34d..61938c9d2 100644 --- a/dbcon/joblist/joblist.cpp +++ b/dbcon/joblist/joblist.cpp @@ -421,25 +421,14 @@ void JobList::querySummary(bool extendedStats) try { - // bug3438, print subquery info prior to outer query - for (vector::iterator i = subqueryJoblists.begin(); i != subqueryJoblists.end(); i++) - { - i->get()->querySummary(extendedStats); - fStats += i->get()->queryStats(); - fExtendedInfo += i->get()->extendedInfo(); - fMiniInfo += i->get()->miniInfo(); - } - JobStepVector::const_iterator qIter = fQuery.begin(); - JobStepVector::const_iterator qEnd = fQuery.end(); - JobStep* js; // //...Add up the I/O and msg counts for the query job steps // - while (qIter != qEnd) + for (auto iter = fQuery.rbegin(); iter != fQuery.rend(); ++iter) { - js = qIter->get(); + auto* js = iter->get(); fStats.fPhyIO += js->phyIOCount(); fStats.fCacheIO += js->cacheIOCount(); @@ -488,19 +477,14 @@ void JobList::querySummary(bool extendedStats) fExtendedInfo += ei; fMiniInfo += js->miniInfo() + "\n"; } - - ++qIter; } - JobStepVector::const_iterator pIter = fProject.begin(); - JobStepVector::const_iterator pEnd = fProject.end(); - // //...Add up the I/O and msg counts for the projection job steps // - while (pIter != pEnd) + for (auto iter = fProject.rbegin(); iter != fProject.rend(); ++iter) { - js = pIter->get(); + auto* js = iter->get(); fStats.fPhyIO += js->phyIOCount(); fStats.fCacheIO += js->cacheIOCount(); @@ -527,7 +511,6 @@ void JobList::querySummary(bool extendedStats) skipCnt = (dynamic_cast(js))->blksSkipped(); fStats.fCPBlocksSkipped += skipCnt; - ++pIter; } if ((!fProject.empty()) && extendedStats) @@ -536,7 +519,7 @@ void JobList::querySummary(bool extendedStats) while (dsi != fDeliveredTables.end()) { - js = dynamic_cast(dsi->second.get()); + auto js = dynamic_cast(dsi->second.get()); string ei; int max = 0; ei = js->extendedInfo(); @@ -553,6 +536,14 @@ void JobList::querySummary(bool extendedStats) ++dsi; } } + // bug3438, print subquery info prior to outer query + for (auto i = subqueryJoblists.rbegin(); i != subqueryJoblists.rend(); i++) + { + i->get()->querySummary(extendedStats); + fStats += i->get()->queryStats(); + fExtendedInfo += i->get()->extendedInfo(); + fMiniInfo += i->get()->miniInfo(); + } } catch (exception& ex) { diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index aee879d64..b67696d07 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -229,7 +229,8 @@ void TupleHashJoinStep::startSmallRunners(uint index) else { joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], - largeSideKeys[index][0], jt, &jobstepThreadPool, resourceManager, numCores)); + largeSideKeys[index][0], jt, &jobstepThreadPool, resourceManager, + numCores)); } joiners[index]->setUniqueLimit(uniqueLimit); @@ -314,18 +315,18 @@ void TupleHashJoinStep::startSmallRunners(uint index) { { oss << "PM join (" << index << ")" << endl; - #ifdef JLF_DEBUG +#ifdef JLF_DEBUG cout << oss.str(); - #endif +#endif extendedInfo += oss.str(); } } else if (joiners[index]->inUM()) { oss << "UM join (" << index << ")" << endl; - #ifdef JLF_DEBUG +#ifdef JLF_DEBUG cout << oss.str(); - #endif +#endif extendedInfo += oss.str(); } } @@ -337,7 +338,6 @@ void TupleHashJoinStep::startSmallRunners(uint index) { boost::mutex::scoped_lock lk(*fStatsMutexPtr); fExtendedInfo += extendedInfo; - formatMiniStats(index); } } @@ -950,9 +950,22 @@ void TupleHashJoinStep::hjRunner() // this clause starts the THJS join threads. startJoinThreads(); + + if (traceOn()) + { + boost::mutex::scoped_lock lk(*fStatsMutexPtr); + formatMiniStats(); + + for (uint32_t i = 0; i < joiners.size(); ++i) + { + formatMiniStatsPerJoiner(i); + } + } + if (fTableOID1 >= 3000) { - StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_SUMMARY, 1, QueryTeleClient::timeNowms(), 1, 0); + StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_SUMMARY, 1, QueryTeleClient::timeNowms(), 1, + 0); postStepSummaryTele(sts); } } @@ -1226,7 +1239,7 @@ bool TupleHashJoinStep::deliverStringTableRowGroup() const } // Must hold the stats lock when calling this! -void TupleHashJoinStep::formatMiniStats(uint32_t index) +void TupleHashJoinStep::formatMiniStatsPerJoiner(uint32_t index) { ostringstream oss; oss << "HJS "; @@ -1251,6 +1264,28 @@ void TupleHashJoinStep::formatMiniStats(uint32_t index) fMiniInfo += oss.str(); } +void TupleHashJoinStep::formatMiniStats() +{ + ostringstream oss; + oss << "HJS "; + + oss << "UM "; + + oss << alias() << " "; + + if (fTableOID2 >= 3000) + oss << fTableOID2; + else + oss << "- "; + + auto joinedRows = + std::accumulate(joinerRunnerInputMatchedStats.begin(), joinerRunnerInputMatchedStats.end(), 0ULL); + oss << " " << "- " << "- " << "- " + << "- " + << "-------- " << joinedRows << "\n"; + fMiniInfo += oss.str(); +} + void TupleHashJoinStep::addJoinFilter(boost::shared_ptr pt, uint32_t index) { boost::shared_ptr newfe(new funcexp::FuncExpWrapper()); @@ -1352,6 +1387,9 @@ void TupleHashJoinStep::startJoinThreads() /* Start join runners */ joinRunners.reserve(joinThreadCount); + // Statistics collection + joinerRunnerInputRecordsStats.resize(joinThreadCount, 0); + joinerRunnerInputMatchedStats.resize(joinThreadCount, 0); for (i = 0; i < joinThreadCount; i++) joinRunners.push_back(jobstepThreadPool.invoke(JoinRunner(this, i))); @@ -1362,6 +1400,7 @@ void TupleHashJoinStep::startJoinThreads() if (lastSmallOuterJoiner != (uint32_t)-1) finishSmallOuterJoin(); + outputDL->endOfInput(); } @@ -1518,6 +1557,7 @@ void TupleHashJoinStep::joinRunnerFcn(uint32_t threadID) smallRGs[i].initRow(&smallRowTemplates[i]); grabSomeWork(&inputData); + std::cout << "joinRunnerFcn " << threadID << " has " << inputData.size() << " RGDatas" << std::endl; while (!inputData.empty() && !cancelled()) { @@ -1528,6 +1568,8 @@ void TupleHashJoinStep::joinRunnerFcn(uint32_t threadID) if (local_inputRG.getRowCount() == 0) continue; + joinerRunnerInputRecordsStats[threadID] += local_inputRG.getRowCount(); + joinOneRG(threadID, joinedRowData, local_inputRG, local_outputRG, largeRow, joinFERow, joinedRow, baseRow, joinMatches, smallRowTemplates, outputDL); } @@ -1614,7 +1656,7 @@ void TupleHashJoinStep::processFE2(RowGroup& input, RowGroup& output, Row& inRow output.incRowCount(); outRow.nextRow(); - if (output.getRowCount() == 8192) + if (output.getRowCount() == rowgroup::rgCommonSize) { results.push_back(result); result.reinit(output); @@ -1788,7 +1830,7 @@ void TupleHashJoinStep::joinOneRG( /* TODO!!! See TupleBPS for the fix for bug 3510! */ applyMapping((*rgMappings)[smallSideCount], largeSideRow, &baseRow); baseRow.setRid(largeSideRow.getRelRid()); - generateJoinResultSet(joinMatches, baseRow, *rgMappings, 0, joinOutput, joinedData, out, + generateJoinResultSet(threadID, joinMatches, baseRow, *rgMappings, 0, joinOutput, joinedData, out, smallRowTemplates, joinedRow, outputDL); } } @@ -1797,7 +1839,8 @@ void TupleHashJoinStep::joinOneRG( out.push_back(joinedData); } -void TupleHashJoinStep::generateJoinResultSet(const vector >& joinerOutput, Row& baseRow, +void TupleHashJoinStep::generateJoinResultSet(const uint32_t threadID, + const vector >& joinerOutput, Row& baseRow, const std::shared_ptr[]>& mappings, const uint32_t depth, RowGroup& l_outputRG, RGData& rgData, vector& outputData, @@ -1814,14 +1857,14 @@ void TupleHashJoinStep::generateJoinResultSet(const vector { smallRow.setPointer(joinerOutput[depth][i]); applyMapping(mappings[depth], smallRow, &baseRow); - generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1, l_outputRG, rgData, outputData, - smallRows, joinedRow, dlp); + generateJoinResultSet(threadID, joinerOutput, baseRow, mappings, depth + 1, l_outputRG, rgData, + outputData, smallRows, joinedRow, dlp); } } else { // NB In case of OUTER JOIN this loop can produce a lot of RGDatas, - // so it is a must to periodically flush from this loop. + // so it is a must to periodically flush from this loop. l_outputRG.getRow(l_outputRG.getRowCount(), &joinedRow); auto flushThreshold = outputDL->maxElements(); @@ -1834,6 +1877,7 @@ void TupleHashJoinStep::generateJoinResultSet(const vector uint32_t dbRoot = l_outputRG.getDBRoot(); uint64_t baseRid = l_outputRG.getBaseRid(); outputData.push_back(rgData); + joinerRunnerInputMatchedStats[threadID] += rowgroup::rgCommonSize; // Count the memory if (UNLIKELY(outputData.size() > flushThreshold || !getMemory(l_outputRG.getSizeWithStrings()))) { @@ -1868,6 +1912,7 @@ void TupleHashJoinStep::generateJoinResultSet(const vector applyMapping(mappings[depth], smallRow, &baseRow); copyRow(baseRow, &joinedRow); } + joinerRunnerInputMatchedStats[threadID] += l_outputRG.getRowCount(); } } diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index e588b8a39..4f590f2de 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -392,7 +392,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep void errorLogging(const std::string& msg, int err) const; void startAdjoiningSteps(); - void formatMiniStats(uint32_t index); + void formatMiniStatsPerJoiner(uint32_t index); + void formatMiniStats(); RowGroupDL *largeDL, *outputDL; std::vector smallDLs; @@ -538,7 +539,7 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep }; void joinRunnerFcn(uint32_t index); void startJoinThreads(); - void generateJoinResultSet(const std::vector>& joinerOutput, + void generateJoinResultSet(const uint32_t threadID, const std::vector>& joinerOutput, rowgroup::Row& baseRow, const std::shared_ptr[]>& mappings, const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, @@ -639,6 +640,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep void segregateJoiners(); std::vector> tbpsJoiners; std::vector> djsJoiners; + std::vector joinerRunnerInputRecordsStats; + std::vector joinerRunnerInputMatchedStats; std::vector djsJoinerMap; boost::scoped_array memUsedByEachJoin; boost::mutex djsLock;